import asyncio import logging import time import json import urllib.request from google.protobuf.json_format import MessageToDict from meshtastic.tcp_interface import TCPInterface from pubsub import pub from meshbot import config from meshbot.database import Database logger = logging.getLogger(__name__) class MeshBot: def __init__(self, db: Database, loop: asyncio.AbstractEventLoop): self.db = db self.loop = loop self.interface: TCPInterface | None = None self.start_time = time.time() self.ws_manager = None # set by main.py def connect(self): host = config.get("meshtastic.host", "localhost") port = config.get("meshtastic.port", 4403) logger.info("Connecting to Meshtastic at %s:%s", host, port) self.interface = TCPInterface(hostname=host, portNumber=port) pub.subscribe(self._on_receive, "meshtastic.receive") pub.subscribe(self._on_connection, "meshtastic.connection.established") pub.subscribe(self._on_node_updated, "meshtastic.node.updated") logger.info("Connected to Meshtastic") def disconnect(self): if self.interface: self.interface.close() def get_channels(self) -> dict: channels = {} if self.interface and self.interface.localNode: for ch in self.interface.localNode.channels: if ch.role: name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}") channels[ch.index] = name return channels def get_node_config(self) -> dict: result = {"device": {}, "lora": {}, "channels": [], "position": {}, "power": {}, "bluetooth": {}, "network": {}} if not self.interface or not self.interface.localNode: return result node = self.interface.localNode my_info = getattr(self.interface, 'myInfo', None) metadata = getattr(self.interface, 'metadata', None) # Device info if my_info: info = MessageToDict(my_info) if hasattr(my_info, 'DESCRIPTOR') else (my_info if isinstance(my_info, dict) else {}) result["device"]["node_num"] = info.get("myNodeNum", "") result["device"]["max_channels"] = info.get("maxChannels", "") if metadata: meta = MessageToDict(metadata) if hasattr(metadata, 'DESCRIPTOR') else (metadata if isinstance(metadata, dict) else {}) result["device"]["firmware_version"] = meta.get("firmwareVersion", "") result["device"]["hw_model"] = meta.get("hwModel", "") # Local config sections local_config = getattr(node, 'localConfig', None) if local_config: cfg = MessageToDict(local_config, preserving_proto_field_name=True) if hasattr(local_config, 'DESCRIPTOR') else {} lora = cfg.get("lora", {}) result["lora"] = { "region": lora.get("region", ""), "modem_preset": lora.get("modem_preset", ""), "hop_limit": lora.get("hop_limit", ""), "tx_power": lora.get("tx_power", ""), "bandwidth": lora.get("bandwidth", ""), "frequency_offset": lora.get("frequency_offset", ""), "tx_enabled": lora.get("tx_enabled", ""), "use_preset": lora.get("use_preset", ""), } device = cfg.get("device", {}) result["device"]["name"] = device.get("role", "") result["device"]["role"] = device.get("role", "") pos = cfg.get("position", {}) result["position"] = { "gps_mode": pos.get("gps_mode", pos.get("gps_enabled", "")), "position_broadcast_secs": pos.get("position_broadcast_secs", ""), "fixed_position": pos.get("fixed_position", ""), } power = cfg.get("power", {}) result["power"] = { "mesh_sds_timeout_secs": power.get("mesh_sds_timeout_secs", ""), "ls_secs": power.get("ls_secs", ""), "min_wake_secs": power.get("min_wake_secs", ""), "is_power_saving": power.get("is_power_saving", ""), } bt = cfg.get("bluetooth", {}) result["bluetooth"] = { "enabled": bt.get("enabled", ""), "mode": bt.get("mode", ""), } network = cfg.get("network", {}) result["network"] = { "wifi_enabled": network.get("wifi_enabled", ""), "ntp_server": network.get("ntp_server", ""), } # Channels for ch in node.channels: if ch.role: name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}") psk = ch.settings.psk psk_display = "Default" if psk == b'\x01' else ("Custom" if psk and psk != b'\x00' else "None") result["channels"].append({ "index": ch.index, "name": name, "role": str(ch.role), "psk": psk_display, }) # Node long name from interface nodes if self.interface.nodes: my_num = getattr(self.interface, 'myInfo', None) if my_num and hasattr(my_num, 'my_node_num'): for n in self.interface.nodes.values(): if n.get("num") == my_num.my_node_num: result["device"]["long_name"] = n.get("user", {}).get("longName", "") result["device"]["short_name"] = n.get("user", {}).get("shortName", "") result["device"]["hw_model"] = n.get("user", {}).get("hwModel", result["device"].get("hw_model", "")) break return result def _on_connection(self, interface, topic=pub.AUTO_TOPIC): logger.info("Meshtastic connection established") if hasattr(interface, 'nodes') and interface.nodes: for node in interface.nodes.values(): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node)) def _on_node_updated(self, node, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node)) def _on_receive(self, packet, interface, topic=pub.AUTO_TOPIC): self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_packet(packet)) async def _handle_node_update(self, node): try: logger.debug("Node update raw: %s", node) node_id = node.get("user", {}).get("id") node_num = node.get("num") if not node_id and node_num: node_id = f"!{node_num:08x}" if not node_id: return node_id = str(node_id) user = node.get("user", {}) position = node.get("position", {}) metrics = node.get("deviceMetrics", {}) snr = node.get("snr") data = { "node_num": node_num, "long_name": user.get("longName"), "short_name": user.get("shortName"), "hw_model": user.get("hwModel"), "lat": position.get("latitude"), "lon": position.get("longitude"), "alt": position.get("altitude"), "battery": metrics.get("batteryLevel"), "voltage": metrics.get("voltage"), "hop_count": node.get("hopsAway"), "via_mqtt": 1 if node.get("viaMqtt") else 0, } if snr is not None: data["snr"] = snr updated = await self.db.upsert_node(node_id, **data) if self.ws_manager: await self.ws_manager.broadcast("node_update", updated) except Exception: logger.exception("Error handling node update") async def _handle_packet(self, packet): try: from_id = packet.get("fromId", str(packet.get("from", ""))) to_id = packet.get("toId", str(packet.get("to", ""))) portnum = packet.get("decoded", {}).get("portnum", "") channel = packet.get("channel", 0) # Update node info from packet node_data = {"snr": packet.get("snr"), "rssi": packet.get("rssi"), "hop_count": packet.get("hopStart", 0) - packet.get("hopLimit", 0) if packet.get("hopStart") else None} if from_id: await self.db.upsert_node(str(from_id), **{k: v for k, v in node_data.items() if v is not None}) # Handle nodeinfo if portnum == "NODEINFO_APP": user = packet.get("decoded", {}).get("user", {}) if user and from_id: await self.db.upsert_node(str(from_id), long_name=user.get("longName"), short_name=user.get("shortName"), hw_model=user.get("hwModel")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle position updates if portnum == "POSITION_APP": pos = packet.get("decoded", {}).get("position", {}) if pos and from_id: await self.db.upsert_node(str(from_id), lat=pos.get("latitude"), lon=pos.get("longitude"), alt=pos.get("altitude")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle telemetry if portnum == "TELEMETRY_APP": telemetry = packet.get("decoded", {}).get("telemetry", {}) metrics = telemetry.get("deviceMetrics", {}) if metrics and from_id: await self.db.upsert_node(str(from_id), battery=metrics.get("batteryLevel"), voltage=metrics.get("voltage")) node = await self.db.get_node(str(from_id)) if node and self.ws_manager: await self.ws_manager.broadcast("node_update", node) # Handle text messages if portnum == "TEXT_MESSAGE_APP": text = packet.get("decoded", {}).get("text", "") my_id = self.get_my_node_id() is_own = my_id and str(from_id) == my_id if not is_own: msg = await self.db.insert_message(str(from_id), str(to_id), channel, portnum, text) if self.ws_manager: await self.ws_manager.broadcast("new_message", msg) stats = await self.db.get_stats() await self.ws_manager.broadcast("stats_update", stats) # Process commands prefix = config.get("bot.command_prefix", "!") if text.startswith(prefix): await self._handle_command(text.strip(), channel, str(from_id)) except Exception: logger.exception("Error handling packet") async def execute_command(self, text: str, channel: int, from_id: str | None = None): """Execute a bot command programmatically (used by scheduler and message handler).""" prefix = config.get("bot.command_prefix", "!") cmd = text.split()[0].lower() response = None if cmd == f"{prefix}ping": hops_str = "" if from_id: node = await self.db.get_node(from_id) if node and node.get("hop_count") is not None: hops_str = f" ueber {node['hop_count']} Hops" response = f"πŸ“ Pong{hops_str}!" elif cmd == f"{prefix}nodes": nodes = await self.db.get_all_nodes() response = f"πŸ“‘ Bekannte Nodes: {len(nodes)}" elif cmd == f"{prefix}info": uptime = self._format_uptime() response = f"ℹ️ {config.get('bot.name', 'MeshDD-Bot')} v{config.get('version', '0.0.0')}\nUptime: {uptime}" elif cmd == f"{prefix}help": response = ( f"πŸ“‹ Kommandos:\n" f"{prefix}ping - Pong\n" f"{prefix}nodes - Anzahl Nodes\n" f"{prefix}info - Bot-Info\n" f"{prefix}stats - Statistiken\n" f"{prefix}uptime - Laufzeit\n" f"{prefix}weather [plz:XXXXX] - Wetter\n" f"{prefix}me - Meine Node-Infos\n" f"{prefix}mesh - Mesh-Netzwerk\n" f"{prefix}help - Diese Hilfe" ) elif cmd == f"{prefix}me": response = await self._get_my_info(from_id) elif cmd == f"{prefix}weather": args = text.split()[1:] plz = None for arg in args: if arg.lower().startswith("plz:"): plz = arg[4:] break response = await self._get_weather(from_id, plz=plz) elif cmd == f"{prefix}stats": stats = await self.db.get_stats() response = ( f"πŸ“Š Statistiken:\n" f"Nodes: {stats['total_nodes']}\n" f"Aktiv (24h): {stats['nodes_24h']}\n" f"Anfragen: {stats['total_commands']}" ) elif cmd == f"{prefix}mesh": response = await self._get_mesh_info() elif cmd == f"{prefix}uptime": response = f"⏱️ Uptime: {self._format_uptime()}" if response: await self._send_text(response, channel) await self.db.insert_command(cmd, channel) if self.ws_manager: stats = await self.db.get_stats() await self.ws_manager.broadcast("stats_update", stats) async def send_message(self, text: str, channel: int): """Send a free-text message on the given channel.""" await self._send_text(text, channel) async def _handle_command(self, text: str, channel: int, from_id: str): await self.execute_command(text, channel, from_id) def get_my_node_id(self) -> str | None: if self.interface and hasattr(self.interface, 'myInfo') and self.interface.myInfo: num = self.interface.myInfo.my_node_num return f"!{num:08x}" return None async def _send_text(self, text: str, channel: int, max_len: int = 170): if not self.interface: return my_id = self.get_my_node_id() or "bot" # Reserve space for "[x/y] " prefix (max 7 bytes) parts = self._split_message(text, max_len - 7) total = len(parts) for i, part in enumerate(parts): if i > 0: await asyncio.sleep(3.0) msg = f"[{i+1}/{total}] {part}" if total > 1 else part # Store and broadcast first, then send via radio try: stored = await self.db.insert_message(my_id, "^all", channel, "TEXT_MESSAGE_APP", msg) if self.ws_manager: await self.ws_manager.broadcast("new_message", stored) except Exception: logger.exception("Error storing sent message") try: self.interface.sendText(msg, channelIndex=channel) except Exception: logger.exception("Error sending text via radio") @staticmethod def _split_message(text: str, max_len: int) -> list[str]: if len(text.encode('utf-8')) <= max_len: return [text] lines = text.split('\n') parts = [] current = "" for line in lines: candidate = f"{current}\n{line}" if current else line if len(candidate.encode('utf-8')) > max_len: if current: parts.append(current) if len(line.encode('utf-8')) > max_len: while line: chunk = line[:max_len] while len(chunk.encode('utf-8')) > max_len: chunk = chunk[:-1] parts.append(chunk) line = line[len(chunk):] else: current = line else: current = candidate if current: parts.append(current) return parts def _format_uptime(self) -> str: elapsed = int(time.time() - self.start_time) days, remainder = divmod(elapsed, 86400) hours, remainder = divmod(remainder, 3600) minutes, seconds = divmod(remainder, 60) parts = [] if days: parts.append(f"{days}d") if hours: parts.append(f"{hours}h") if minutes: parts.append(f"{minutes}m") parts.append(f"{seconds}s") return " ".join(parts) async def _get_my_info(self, from_id: str) -> str: if not from_id: return "❌ Node-ID unbekannt." node = await self.db.get_node(from_id) if not node: return f"❌ Keine Daten fuer {from_id}." name = node.get("long_name") or node.get("short_name") or from_id hw = node.get("hw_model") or "-" hops = node.get("hop_count") if node.get("hop_count") is not None else "-" snr = f"{node['snr']:.1f} dB" if node.get("snr") is not None else "-" rssi = f"{node['rssi']} dBm" if node.get("rssi") is not None else "-" bat = f"{node['battery']}%" if node.get("battery") is not None else "-" voltage = f"{node['voltage']:.2f}V" if node.get("voltage") is not None else "-" alt = f"{int(node['alt'])} m" if node.get("alt") is not None else "-" pos = "" if node.get("lat") and node.get("lon"): pos = f"{node['lat']:.4f}, {node['lon']:.4f}" last_seen = "" if node.get("last_seen"): elapsed = int(time.time() - node["last_seen"]) if elapsed < 60: last_seen = f"{elapsed}s" elif elapsed < 3600: last_seen = f"{elapsed // 60}m" elif elapsed < 86400: last_seen = f"{elapsed // 3600}h" else: last_seen = f"{elapsed // 86400}d" lines = [ f"πŸ“‹ {name}", f"ID: {from_id}", f"HW: {hw}", f"Hops: {hops}", f"SNR: {snr} | RSSI: {rssi}", f"Bat: {bat} ({voltage})", ] if pos: lines.append(f"Pos: {pos} | {alt}") if last_seen: lines.append(f"Zuletzt: vor {last_seen}") return "\n".join(lines) async def _get_mesh_info(self) -> str: nodes = await self.db.get_all_nodes() total = len(nodes) now = time.time() online = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 900) active_24h = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 86400) with_pos = sum(1 for n in nodes if n.get("lat") and n.get("lon")) # Hop distribution hop_counts = {} for n in nodes: h = n.get("hop_count") if h is not None: hop_counts[h] = hop_counts.get(h, 0) + 1 hop_lines = [] for k in sorted(hop_counts.keys()): label = "Direkt" if k == 0 else f"{k} Hop{'s' if k > 1 else ''}" hop_lines.append(f" {label}: {hop_counts[k]}") # Hardware distribution (top 3) hw_counts = {} for n in nodes: hw = n.get("hw_model") if hw: hw_counts[hw] = hw_counts.get(hw, 0) + 1 top_hw = sorted(hw_counts.items(), key=lambda x: -x[1])[:3] hw_lines = [f" {hw}: {cnt}" for hw, cnt in top_hw] parts = [ f"πŸ•ΈοΈ Mesh-Netzwerk:\n" f"Nodes: {total} ({online} online)\n" f"Aktiv 24h: {active_24h}\n" f"Mit Position: {with_pos}", ] if hop_lines: parts.append("πŸ“Š Hop-Verteilung:\n" + "\n".join(hop_lines)) if hw_lines: parts.append("πŸ”§ Top Hardware:\n" + "\n".join(hw_lines)) return "\n\n".join(parts) async def _get_weather(self, from_id: str, plz: str | None = None) -> str: lat, lon = None, None if plz: coords = await self._geocode_plz(plz) if coords: lat, lon = coords else: return f"❌ PLZ {plz} nicht gefunden." if lat is None: node = await self.db.get_node(from_id) if node and node.get("lat") and node.get("lon"): lat, lon = node["lat"], node["lon"] else: lat, lon = 51.0504, 13.7373 # Dresden Zentrum url = ( f"https://api.open-meteo.com/v1/forecast?" f"latitude={lat}&longitude={lon}" f"¤t=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code,pressure_msl,dew_point_2m" ) try: loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, self._fetch_url, url) current = data.get("current", {}) temp = current.get("temperature_2m", "?") humidity = current.get("relative_humidity_2m", "?") wind = current.get("wind_speed_10m", "?") code = current.get("weather_code", 0) pressure = current.get("pressure_msl", "?") dewpoint = current.get("dew_point_2m", "?") weather_icon = self._weather_code_to_icon(code) location = await self._reverse_geocode(lat, lon) loc_str = f" ({location})" if location else "" return ( f"{weather_icon} Wetter{loc_str}:\n" f"Temp: {temp}Β°C\n" f"Feuchte: {humidity}%\n" f"Taupunkt: {dewpoint}Β°C\n" f"Luftdruck: {pressure} hPa\n" f"Wind: {wind} km/h" ) except Exception: logger.exception("Error fetching weather") return "❌ Wetterdaten konnten nicht abgerufen werden." async def _geocode_plz(self, plz: str) -> tuple[float, float] | None: url = ( f"https://nominatim.openstreetmap.org/search?" f"postalcode={plz}&country=DE&format=json&limit=1&accept-language=de" ) try: loop = asyncio.get_event_loop() req = urllib.request.Request(url, headers={"User-Agent": "MeshDD-Bot/1.0"}) data = await loop.run_in_executor(None, self._fetch_request, req) if data and len(data) > 0: return float(data[0]["lat"]), float(data[0]["lon"]) except Exception: logger.debug("Geocode PLZ failed for %s", plz) return None async def _reverse_geocode(self, lat: float, lon: float) -> str: url = ( f"https://nominatim.openstreetmap.org/reverse?" f"lat={lat}&lon={lon}&format=json&zoom=10&accept-language=de" ) try: loop = asyncio.get_event_loop() req = urllib.request.Request(url, headers={"User-Agent": "MeshDD-Bot/1.0"}) data = await loop.run_in_executor(None, self._fetch_request, req) addr = data.get("address", {}) return addr.get("city") or addr.get("town") or addr.get("village") or addr.get("municipality") or "" except Exception: logger.debug("Reverse geocode failed for %s,%s", lat, lon) return "" @staticmethod def _fetch_request(req: urllib.request.Request) -> dict: with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read().decode()) @staticmethod def _fetch_url(url: str) -> dict: with urllib.request.urlopen(url, timeout=10) as resp: return json.loads(resp.read().decode()) @staticmethod def _weather_code_to_icon(code: int) -> str: if code == 0: return "β˜€οΈ" if code in (1, 2, 3): return "β›…" if code in (45, 48): return "🌫️" if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82): return "🌧️" if code in (71, 73, 75, 77, 85, 86): return "🌨️" if code in (95, 96, 99): return "β›ˆοΈ" return "🌀️"