import asyncio import logging import time import json import urllib.request from meshtastic.tcp_interface import TCPInterface from pubsub import pub from meshbot import config from meshbot.database import Database logger = logging.getLogger(__name__) class MeshBot: def __init__(self, db: Database, loop: asyncio.AbstractEventLoop): self.db = db self.loop = loop self.interface: TCPInterface | None = None self.start_time = time.time() self.ws_manager = None # set by main.py def connect(self): host = config.get("meshtastic.host", "localhost") port = config.get("meshtastic.port", 4403) logger.info("Connecting to Meshtastic at %s:%s", host, port) self.interface = TCPInterface(hostname=host, portNumber=port) pub.subscribe(self._on_receive, "meshtastic.receive") pub.subscribe(self._on_connection, "meshtastic.connection.established") pub.subscribe(self._on_node_updated, "meshtastic.node.updated") logger.info("Connected to Meshtastic") def disconnect(self): if self.interface: self.interface.close() def get_channels(self) -> dict: channels = {} if self.interface and self.interface.localNode: for ch in self.interface.localNode.channels: if ch.role: name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}") channels[ch.index] = name return channels def _on_connection(self, interface, topic=pub.AUTO_TOPIC): logger.info("Meshtastic connection established") if hasattr(interface, 'nodes') and interface.nodes: for node in interface.nodes.values(): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node)) def _on_node_updated(self, node, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node)) def _on_receive(self, packet, interface, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_packet(packet)) async def _handle_node_update(self, node): try: logger.debug("Node update raw: %s", node) node_id = node.get("user", {}).get("id") node_num = node.get("num") if not node_id and node_num: node_id = f"!{node_num:08x}" if not node_id: return node_id = str(node_id) user = node.get("user", {}) position = node.get("position", {}) metrics = node.get("deviceMetrics", {}) snr = node.get("snr") data = { "node_num": node_num, "long_name": user.get("longName"), "short_name": user.get("shortName"), "hw_model": user.get("hwModel"), "lat": position.get("latitude"), "lon": position.get("longitude"), "alt": position.get("altitude"), "battery": metrics.get("batteryLevel"), "voltage": metrics.get("voltage"), "hop_count": node.get("hopsAway"), "via_mqtt": 1 if node.get("viaMqtt") else 0, } if snr is not None: data["snr"] = snr updated = await self.db.upsert_node(node_id, **data) if self.ws_manager: await self.ws_manager.broadcast("node_update", updated) except Exception: logger.exception("Error handling node update") async def _handle_packet(self, packet): try: from_id = packet.get("fromId", str(packet.get("from", ""))) to_id = packet.get("toId", str(packet.get("to", ""))) portnum = packet.get("decoded", {}).get("portnum", "") channel = packet.get("channel", 0) # Update node info from packet node_data = {"snr": packet.get("snr"), "rssi": packet.get("rssi"), "hop_count": packet.get("hopStart", 0) - packet.get("hopLimit", 0) if packet.get("hopStart") else None} if from_id: await self.db.upsert_node(str(from_id), **{k: v for k, v in node_data.items() if v is not None}) # Handle nodeinfo if portnum == "NODEINFO_APP": user = packet.get("decoded", {}).get("user", {}) if user and from_id: await self.db.upsert_node(str(from_id), long_name=user.get("longName"), short_name=user.get("shortName"), hw_model=user.get("hwModel")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle position updates if portnum == "POSITION_APP": pos = packet.get("decoded", {}).get("position", {}) if pos and from_id: await self.db.upsert_node(str(from_id), lat=pos.get("latitude"), lon=pos.get("longitude"), alt=pos.get("altitude")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle telemetry if portnum == "TELEMETRY_APP": telemetry = packet.get("decoded", {}).get("telemetry", {}) metrics = telemetry.get("deviceMetrics", {}) if metrics and from_id: await self.db.upsert_node(str(from_id), battery=metrics.get("batteryLevel"), voltage=metrics.get("voltage")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle text messages if portnum == "TEXT_MESSAGE_APP": text = packet.get("decoded", {}).get("text", "") msg = await self.db.insert_message(str(from_id), str(to_id), channel, portnum, text) if self.ws_manager: await self.ws_manager.broadcast("new_message", msg) stats = await self.db.get_stats() await self.ws_manager.broadcast("stats_update", stats) # Process commands prefix = config.get("bot.command_prefix", "!") if text.startswith(prefix): await self._handle_command(text.strip(), channel, str(from_id)) except Exception: logger.exception("Error handling packet") async def _handle_command(self, text: str, channel: int, from_id: str): prefix = config.get("bot.command_prefix", "!") cmd = text.split()[0].lower() response = None if cmd == f"{prefix}ping": response = "πŸ“ Pong!" elif cmd == f"{prefix}nodes": nodes = await self.db.get_all_nodes() response = f"πŸ“‘ Bekannte Nodes: {len(nodes)}" elif cmd == f"{prefix}info": uptime = self._format_uptime() response = f"ℹ️ {config.get('bot.name', 'MeshDD-Bot')} v{config.get('version', '0.0.0')}\nUptime: {uptime}" elif cmd == f"{prefix}help": response = ( f"πŸ“‹ Kommandos:\n" f"{prefix}ping - Pong\n" f"{prefix}nodes - Anzahl Nodes\n" f"{prefix}info - Bot-Info\n" f"{prefix}stats - Statistiken\n" f"{prefix}uptime - Laufzeit\n" f"{prefix}weather - Wetter\n" f"{prefix}mesh - Mesh-Netzwerk\n" f"{prefix}help - Diese Hilfe" ) elif cmd == f"{prefix}weather": response = await self._get_weather(from_id) elif cmd == f"{prefix}stats": stats = await self.db.get_stats() response = ( f"πŸ“Š Statistiken:\n" f"Nodes: {stats['total_nodes']}\n" f"Aktiv (24h): {stats['nodes_24h']}\n" f"Anfragen: {stats['total_commands']}" ) elif cmd == f"{prefix}mesh": response = await self._get_mesh_info() elif cmd == f"{prefix}uptime": response = f"⏱️ Uptime: {self._format_uptime()}" if response: await self._send_text(response, channel) await self.db.insert_command(cmd) if self.ws_manager: stats = await self.db.get_stats() await self.ws_manager.broadcast("stats_update", stats) async def _send_text(self, text: str, channel: int, max_len: int = 170): if not self.interface: return parts = self._split_message(text, max_len) for i, part in enumerate(parts): if i > 0: await asyncio.sleep(1.5) try: self.interface.sendText(part, channelIndex=channel) except Exception: logger.exception("Error sending text") @staticmethod def _split_message(text: str, max_len: int) -> list[str]: if len(text.encode('utf-8')) <= max_len: return [text] lines = text.split('\n') parts = [] current = "" for line in lines: candidate = f"{current}\n{line}" if current else line if len(candidate.encode('utf-8')) > max_len: if current: parts.append(current) if len(line.encode('utf-8')) > max_len: while line: chunk = line[:max_len] while len(chunk.encode('utf-8')) > max_len: chunk = chunk[:-1] parts.append(chunk) line = line[len(chunk):] else: current = line else: current = candidate if current: parts.append(current) return parts def _format_uptime(self) -> str: elapsed = int(time.time() - self.start_time) days, remainder = divmod(elapsed, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) parts = [] if days: parts.append(f"{days}d") if hours: parts.append(f"{hours}h") if minutes: parts.append(f"{minutes}m") parts.append(f"{seconds}s") return " ".join(parts) async def _get_mesh_info(self) -> str: nodes = await self.db.get_all_nodes() total = len(nodes) now = time.time() online = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 900) active_24h = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 86400) with_pos = sum(1 for n in nodes if n.get("lat") and n.get("lon")) # Hop distribution hop_counts = {} for n in nodes: h = n.get("hop_count") if h is not None: hop_counts[h] = hop_counts.get(h, 0) + 1 hop_lines = [] for k in sorted(hop_counts.keys()): label = "Direkt" if k == 0 else f"{k} Hop{'s' if k > 1 else ''}" hop_lines.append(f" {label}: {hop_counts[k]}") # Hardware distribution (top 3) hw_counts = {} for n in nodes: hw = n.get("hw_model") if hw: hw_counts[hw] = hw_counts.get(hw, 0) + 1 top_hw = sorted(hw_counts.items(), key=lambda x: -x[1])[:3] hw_lines = [f" {hw}: {cnt}" for hw, cnt in top_hw] parts = [ f"πŸ•ΈοΈ Mesh-Netzwerk:\n" f"Nodes: {total} ({online} online)\n" f"Aktiv 24h: {active_24h}\n" f"Mit Position: {with_pos}", ] if hop_lines: parts.append("πŸ“Š Hop-Verteilung:\n" + "\n".join(hop_lines)) if hw_lines: parts.append("πŸ”§ Top Hardware:\n" + "\n".join(hw_lines)) return "\n\n".join(parts) async def _get_weather(self, from_id: str) -> str: node = await self.db.get_node(from_id) fallback = False if node and node.get("lat") and node.get("lon"): lat, lon = node["lat"], node["lon"] else: lat, lon = 51.0504, 13.7373 # Dresden Zentrum fallback = True url = ( f"https://api.open-meteo.com/v1/forecast?" f"latitude={lat}&longitude={lon}" f"¤t=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code" ) try: loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, self._fetch_url, url) current = data.get("current", {}) temp = current.get("temperature_2m", "?") humidity = current.get("relative_humidity_2m", "?") wind = current.get("wind_speed_10m", "?") code = current.get("weather_code", 0) weather_icon = self._weather_code_to_icon(code) location = " (Dresden)" if fallback else "" return ( f"{weather_icon} Wetter{location}:\n" f"Temp: {temp}Β°C\n" f"Feuchte: {humidity}%\n" f"Wind: {wind} km/h" ) except Exception: logger.exception("Error fetching weather") return "❌ Wetterdaten konnten nicht abgerufen werden." @staticmethod def _fetch_url(url: str) -> dict: with urllib.request.urlopen(url, timeout=10) as resp: return json.loads(resp.read().decode()) @staticmethod def _weather_code_to_icon(code: int) -> str: if code == 0: return "β˜€οΈ" if code in (1, 2, 3): return "β›…" if code in (45, 48): return "🌫️" if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82): return "🌧️" if code in (71, 73, 75, 77, 85, 86): return "🌨️" if code in (95, 96, 99): return "β›ˆοΈ" return "🌀️"