import asyncio import logging import time import json import urllib.request from meshtastic.tcp_interface import TCPInterface from pubsub import pub from meshbot import config from meshbot.database import Database logger = logging.getLogger(__name__) class MeshBot: def __init__(self, db: Database, loop: asyncio.AbstractEventLoop): self.db = db self.loop = loop self.interface: TCPInterface | None = None self.start_time = time.time() self.ws_manager = None # set by main.py def connect(self): host = config.get("meshtastic.host", "localhost") port = config.get("meshtastic.port", 4403) logger.info("Connecting to Meshtastic at %s:%s", host, port) self.interface = TCPInterface(hostname=host, portNumber=port) pub.subscribe(self._on_receive, "meshtastic.receive") pub.subscribe(self._on_connection, "meshtastic.connection.established") pub.subscribe(self._on_node_updated, "meshtastic.node.updated") logger.info("Connected to Meshtastic") def disconnect(self): if self.interface: self.interface.close() def get_channels(self) -> dict: channels = {} if self.interface and self.interface.localNode: for ch in self.interface.localNode.channels: if ch.role: name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}") channels[ch.index] = name return channels def _on_connection(self, interface, topic=pub.AUTO_TOPIC): logger.info("Meshtastic connection established") if hasattr(interface, 'nodes') and interface.nodes: for node in interface.nodes.values(): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node)) def _on_node_updated(self, node, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node)) def _on_receive(self, packet, interface, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_packet(packet)) async def _handle_node_update(self, node): try: logger.debug("Node update raw: %s", node) node_id = node.get("user", {}).get("id") node_num = node.get("num") if not node_id and node_num: node_id = f"!{node_num:08x}" if not node_id: return node_id = str(node_id) user = node.get("user", {}) position = node.get("position", {}) metrics = node.get("deviceMetrics", {}) snr = node.get("snr") data = { "node_num": node_num, "long_name": user.get("longName"), "short_name": user.get("shortName"), "hw_model": user.get("hwModel"), "lat": position.get("latitude"), "lon": position.get("longitude"), "alt": position.get("altitude"), "battery": metrics.get("batteryLevel"), "voltage": metrics.get("voltage"), "hop_count": node.get("hopsAway"), "via_mqtt": 1 if node.get("viaMqtt") else 0, } if snr is not None: data["snr"] = snr updated = await self.db.upsert_node(node_id, **data) if self.ws_manager: await self.ws_manager.broadcast("node_update", updated) except Exception: logger.exception("Error handling node update") async def _handle_packet(self, packet): try: from_id = packet.get("fromId", str(packet.get("from", ""))) to_id = packet.get("toId", str(packet.get("to", ""))) portnum = packet.get("decoded", {}).get("portnum", "") channel = packet.get("channel", 0) # Update node info from packet node_data = {"snr": packet.get("snr"), "rssi": packet.get("rssi"), "hop_count": packet.get("hopStart", 0) - packet.get("hopLimit", 0) if packet.get("hopStart") else None} if from_id: await self.db.upsert_node(str(from_id), **{k: v for k, v in node_data.items() if v is not None}) # Handle nodeinfo if portnum == "NODEINFO_APP": user = packet.get("decoded", {}).get("user", {}) if user and from_id: await self.db.upsert_node(str(from_id), long_name=user.get("longName"), short_name=user.get("shortName"), hw_model=user.get("hwModel")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle position updates if portnum == "POSITION_APP": pos = packet.get("decoded", {}).get("position", {}) if pos and from_id: await self.db.upsert_node(str(from_id), lat=pos.get("latitude"), lon=pos.get("longitude"), alt=pos.get("altitude")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle telemetry if portnum == "TELEMETRY_APP": telemetry = packet.get("decoded", {}).get("telemetry", {}) metrics = telemetry.get("deviceMetrics", {}) if metrics and from_id: await self.db.upsert_node(str(from_id), battery=metrics.get("batteryLevel"), voltage=metrics.get("voltage")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle text messages if portnum == "TEXT_MESSAGE_APP": text = packet.get("decoded", {}).get("text", "") msg = await self.db.insert_message(str(from_id), str(to_id), channel, portnum, text) if self.ws_manager: await self.ws_manager.broadcast("new_message", msg) stats = await self.db.get_stats() await self.ws_manager.broadcast("stats_update", stats) # Process commands prefix = config.get("bot.command_prefix", "!") if text.startswith(prefix): await self._handle_command(text.strip(), channel, str(from_id)) except Exception: logger.exception("Error handling packet") async def _handle_command(self, text: str, channel: int, from_id: str): prefix = config.get("bot.command_prefix", "!") cmd = text.split()[0].lower() response = None if cmd == f"{prefix}ping": response = "🏓 Pong!" elif cmd == f"{prefix}nodes": nodes = await self.db.get_all_nodes() response = f"📡 Bekannte Nodes: {len(nodes)}" elif cmd == f"{prefix}info": uptime = self._format_uptime() response = f"â„šī¸ {config.get('bot.name', 'MeshDD-Bot')} v{config.get('version', '0.0.0')}\nUptime: {uptime}" elif cmd == f"{prefix}help": response = ( f"📋 Kommandos:\n" f"{prefix}ping - Pong\n" f"{prefix}nodes - Anzahl Nodes\n" f"{prefix}info - Bot-Info\n" f"{prefix}stats - Statistiken\n" f"{prefix}uptime - Laufzeit\n" f"{prefix}weather - Wetter\n" f"{prefix}help - Diese Hilfe" ) elif cmd == f"{prefix}weather": response = await self._get_weather(from_id) elif cmd == f"{prefix}stats": stats = await self.db.get_stats() response = ( f"📊 Statistiken:\n" f"Nodes: {stats['total_nodes']}\n" f"Mit Position: {stats['nodes_with_position']}\n" f"Nachrichten: {stats['total_messages']}\n" f"Textnachrichten: {stats['text_messages']}" ) elif cmd == f"{prefix}uptime": response = f"âąī¸ Uptime: {self._format_uptime()}" if response: self._send_text(response, channel) def _send_text(self, text: str, channel: int): if self.interface: try: self.interface.sendText(text, channelIndex=channel) except Exception: logger.exception("Error sending text") def _format_uptime(self) -> str: elapsed = int(time.time() - self.start_time) days, remainder = divmod(elapsed, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) parts = [] if days: parts.append(f"{days}d") if hours: parts.append(f"{hours}h") if minutes: parts.append(f"{minutes}m") parts.append(f"{seconds}s") return " ".join(parts) async def _get_weather(self, from_id: str) -> str: node = await self.db.get_node(from_id) fallback = False if node and node.get("lat") and node.get("lon"): lat, lon = node["lat"], node["lon"] else: lat, lon = 51.0504, 13.7373 # Dresden Zentrum fallback = True url = ( f"https://api.open-meteo.com/v1/forecast?" f"latitude={lat}&longitude={lon}" f"¤t=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code" ) try: loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, self._fetch_url, url) current = data.get("current", {}) temp = current.get("temperature_2m", "?") humidity = current.get("relative_humidity_2m", "?") wind = current.get("wind_speed_10m", "?") code = current.get("weather_code", 0) weather_icon = self._weather_code_to_icon(code) location = " (Dresden)" if fallback else "" return ( f"{weather_icon} Wetter{location}:\n" f"Temp: {temp}°C\n" f"Feuchte: {humidity}%\n" f"Wind: {wind} km/h" ) except Exception: logger.exception("Error fetching weather") return "❌ Wetterdaten konnten nicht abgerufen werden." @staticmethod def _fetch_url(url: str) -> dict: with urllib.request.urlopen(url, timeout=10) as resp: return json.loads(resp.read().decode()) @staticmethod def _weather_code_to_icon(code: int) -> str: if code == 0: return "â˜€ī¸" if code in (1, 2, 3): return "⛅" if code in (45, 48): return "đŸŒĢī¸" if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82): return "đŸŒ§ī¸" if code in (71, 73, 75, 77, 85, 86): return "đŸŒ¨ī¸" if code in (95, 96, 99): return "â›ˆī¸" return "đŸŒ¤ī¸"