import asyncio import logging import time import json import urllib.request from meshtastic.tcp_interface import TCPInterface from pubsub import pub from meshbot import config from meshbot.database import Database logger = logging.getLogger(__name__) class MeshBot: def __init__(self, db: Database, loop: asyncio.AbstractEventLoop): self.db = db self.loop = loop self.interface: TCPInterface | None = None self.start_time = time.time() self.ws_manager = None # set by main.py def connect(self): host = config.get("meshtastic.host", "localhost") port = config.get("meshtastic.port", 4403) logger.info("Connecting to Meshtastic at %s:%s", host, port) self.interface = TCPInterface(hostname=host, portNumber=port) pub.subscribe(self._on_receive, "meshtastic.receive") pub.subscribe(self._on_connection, "meshtastic.connection.established") pub.subscribe(self._on_node_updated, "meshtastic.node.updated") logger.info("Connected to Meshtastic") def disconnect(self): if self.interface: self.interface.close() def _on_connection(self, interface, topic=pub.AUTO_TOPIC): logger.info("Meshtastic connection established") def _on_node_updated(self, node, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node)) def _on_receive(self, packet, interface, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_packet(packet)) async def _handle_node_update(self, node): try: node_id = node.get("user", {}).get("id") or node.get("num") if not node_id: return node_id = str(node_id) user = node.get("user", {}) position = node.get("position", {}) metrics = node.get("deviceMetrics", {}) snr = node.get("snr") data = { "node_num": node.get("num"), "long_name": user.get("longName"), "short_name": user.get("shortName"), "hw_model": user.get("hwModel"), "lat": position.get("latitude"), "lon": position.get("longitude"), "alt": position.get("altitude"), "battery": metrics.get("batteryLevel"), "voltage": metrics.get("voltage"), "hop_count": node.get("hopsAway"), "via_mqtt": 1 if node.get("viaMqtt") else 0, } if snr is not None: data["snr"] = snr updated = await self.db.upsert_node(node_id, **data) if self.ws_manager: await self.ws_manager.broadcast("node_update", updated) except Exception: logger.exception("Error handling node update") async def _handle_packet(self, packet): try: from_id = packet.get("fromId", str(packet.get("from", ""))) to_id = packet.get("toId", str(packet.get("to", ""))) portnum = packet.get("decoded", {}).get("portnum", "") channel = packet.get("channel", 0) # Update node info from packet node_data = {"snr": packet.get("snr"), "rssi": packet.get("rssi"), "hop_count": packet.get("hopStart", 0) - packet.get("hopLimit", 0) if packet.get("hopStart") else None} if from_id: await self.db.upsert_node(str(from_id), **{k: v for k, v in node_data.items() if v is not None}) # Handle position updates if portnum == "POSITION_APP": pos = packet.get("decoded", {}).get("position", {}) if pos and from_id: await self.db.upsert_node(str(from_id), lat=pos.get("latitude"), lon=pos.get("longitude"), alt=pos.get("altitude")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle telemetry if portnum == "TELEMETRY_APP": telemetry = packet.get("decoded", {}).get("telemetry", {}) metrics = telemetry.get("deviceMetrics", {}) if metrics and from_id: await self.db.upsert_node(str(from_id), battery=metrics.get("batteryLevel"), voltage=metrics.get("voltage")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle text messages if portnum == "TEXT_MESSAGE_APP": text = packet.get("decoded", {}).get("text", "") msg = await self.db.insert_message(str(from_id), str(to_id), channel, portnum, text) if self.ws_manager: await self.ws_manager.broadcast("new_message", msg) stats = await self.db.get_stats() await self.ws_manager.broadcast("stats_update", stats) # Process commands if text.startswith("!"): await self._handle_command(text.strip(), channel, str(from_id)) except Exception: logger.exception("Error handling packet") async def _handle_command(self, text: str, channel: int, from_id: str): cmd = text.split()[0].lower() response = None if cmd == "!ping": response = "πŸ“ Pong!" elif cmd == "!nodes": nodes = await self.db.get_all_nodes() response = f"πŸ“‘ Bekannte Nodes: {len(nodes)}" elif cmd == "!info": uptime = self._format_uptime() response = f"ℹ️ {config.get('bot.name', 'MeshDD-Bot')} v{config.get('version', '0.0.0')}\nUptime: {uptime}" elif cmd == "!help": response = ( "πŸ“‹ Kommandos:\n" "!ping - Pong\n" "!nodes - Anzahl Nodes\n" "!info - Bot-Info\n" "!stats - Statistiken\n" "!uptime - Laufzeit\n" "!weather - Wetter\n" "!help - Diese Hilfe" ) elif cmd == "!weather": response = await self._get_weather(from_id) elif cmd == "!stats": stats = await self.db.get_stats() response = ( f"πŸ“Š Statistiken:\n" f"Nodes: {stats['total_nodes']}\n" f"Mit Position: {stats['nodes_with_position']}\n" f"Nachrichten: {stats['total_messages']}\n" f"Textnachrichten: {stats['text_messages']}" ) elif cmd == "!uptime": response = f"⏱️ Uptime: {self._format_uptime()}" if response: self._send_text(response, channel) def _send_text(self, text: str, channel: int): if self.interface: try: self.interface.sendText(text, channelIndex=channel) except Exception: logger.exception("Error sending text") def _format_uptime(self) -> str: elapsed = int(time.time() - self.start_time) days, remainder = divmod(elapsed, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) parts = [] if days: parts.append(f"{days}d") if hours: parts.append(f"{hours}h") if minutes: parts.append(f"{minutes}m") parts.append(f"{seconds}s") return " ".join(parts) async def _get_weather(self, from_id: str) -> str: node = await self.db.get_node(from_id) if not node or not node.get("lat") or not node.get("lon"): return "❌ Keine Position fΓΌr diesen Node bekannt." lat, lon = node["lat"], node["lon"] url = ( f"https://api.open-meteo.com/v1/forecast?" f"latitude={lat}&longitude={lon}" f"¤t=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code" ) try: loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, self._fetch_url, url) current = data.get("current", {}) temp = current.get("temperature_2m", "?") humidity = current.get("relative_humidity_2m", "?") wind = current.get("wind_speed_10m", "?") code = current.get("weather_code", 0) weather_icon = self._weather_code_to_icon(code) return ( f"{weather_icon} Wetter:\n" f"Temp: {temp}Β°C\n" f"Feuchte: {humidity}%\n" f"Wind: {wind} km/h" ) except Exception: logger.exception("Error fetching weather") return "❌ Wetterdaten konnten nicht abgerufen werden." @staticmethod def _fetch_url(url: str) -> dict: with urllib.request.urlopen(url, timeout=10) as resp: return json.loads(resp.read().decode()) @staticmethod def _weather_code_to_icon(code: int) -> str: if code == 0: return "β˜€οΈ" if code in (1, 2, 3): return "β›…" if code in (45, 48): return "🌫️" if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82): return "🌧️" if code in (71, 73, 75, 77, 85, 86): return "🌨️" if code in (95, 96, 99): return "β›ˆοΈ" return "🌀️"