MeshDD-Bot/meshbot/bot.py
ppfeiffer a34162d428 feat: v0.3.8 - Ping response shows hop count
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 17:25:01 +01:00

529 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import time
import json
import urllib.request
from google.protobuf.json_format import MessageToDict
from meshtastic.tcp_interface import TCPInterface
from pubsub import pub
from meshbot import config
from meshbot.database import Database
logger = logging.getLogger(__name__)
class MeshBot:
def __init__(self, db: Database, loop: asyncio.AbstractEventLoop):
self.db = db
self.loop = loop
self.interface: TCPInterface | None = None
self.start_time = time.time()
self.ws_manager = None # set by main.py
def connect(self):
host = config.get("meshtastic.host", "localhost")
port = config.get("meshtastic.port", 4403)
logger.info("Connecting to Meshtastic at %s:%s", host, port)
self.interface = TCPInterface(hostname=host, portNumber=port)
pub.subscribe(self._on_receive, "meshtastic.receive")
pub.subscribe(self._on_connection, "meshtastic.connection.established")
pub.subscribe(self._on_node_updated, "meshtastic.node.updated")
logger.info("Connected to Meshtastic")
def disconnect(self):
if self.interface:
self.interface.close()
def get_channels(self) -> dict:
channels = {}
if self.interface and self.interface.localNode:
for ch in self.interface.localNode.channels:
if ch.role:
name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}")
channels[ch.index] = name
return channels
def get_node_config(self) -> dict:
result = {"device": {}, "lora": {}, "channels": [], "position": {}, "power": {}, "bluetooth": {}, "network": {}}
if not self.interface or not self.interface.localNode:
return result
node = self.interface.localNode
my_info = getattr(self.interface, 'myInfo', None)
metadata = getattr(self.interface, 'metadata', None)
# Device info
if my_info:
info = MessageToDict(my_info) if hasattr(my_info, 'DESCRIPTOR') else (my_info if isinstance(my_info, dict) else {})
result["device"]["node_num"] = info.get("myNodeNum", "")
result["device"]["max_channels"] = info.get("maxChannels", "")
if metadata:
meta = MessageToDict(metadata) if hasattr(metadata, 'DESCRIPTOR') else (metadata if isinstance(metadata, dict) else {})
result["device"]["firmware_version"] = meta.get("firmwareVersion", "")
result["device"]["hw_model"] = meta.get("hwModel", "")
# Local config sections
local_config = getattr(node, 'localConfig', None)
if local_config:
cfg = MessageToDict(local_config, preserving_proto_field_name=True) if hasattr(local_config, 'DESCRIPTOR') else {}
lora = cfg.get("lora", {})
result["lora"] = {
"region": lora.get("region", ""),
"modem_preset": lora.get("modem_preset", ""),
"hop_limit": lora.get("hop_limit", ""),
"tx_power": lora.get("tx_power", ""),
"bandwidth": lora.get("bandwidth", ""),
"frequency_offset": lora.get("frequency_offset", ""),
"tx_enabled": lora.get("tx_enabled", ""),
"use_preset": lora.get("use_preset", ""),
}
device = cfg.get("device", {})
result["device"]["name"] = device.get("role", "")
result["device"]["role"] = device.get("role", "")
pos = cfg.get("position", {})
result["position"] = {
"gps_mode": pos.get("gps_mode", pos.get("gps_enabled", "")),
"position_broadcast_secs": pos.get("position_broadcast_secs", ""),
"fixed_position": pos.get("fixed_position", ""),
}
power = cfg.get("power", {})
result["power"] = {
"mesh_sds_timeout_secs": power.get("mesh_sds_timeout_secs", ""),
"ls_secs": power.get("ls_secs", ""),
"min_wake_secs": power.get("min_wake_secs", ""),
"is_power_saving": power.get("is_power_saving", ""),
}
bt = cfg.get("bluetooth", {})
result["bluetooth"] = {
"enabled": bt.get("enabled", ""),
"mode": bt.get("mode", ""),
}
network = cfg.get("network", {})
result["network"] = {
"wifi_enabled": network.get("wifi_enabled", ""),
"ntp_server": network.get("ntp_server", ""),
}
# Channels
for ch in node.channels:
if ch.role:
name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}")
psk = ch.settings.psk
psk_display = "Default" if psk == b'\x01' else ("Custom" if psk and psk != b'\x00' else "None")
result["channels"].append({
"index": ch.index,
"name": name,
"role": str(ch.role),
"psk": psk_display,
})
# Node long name from interface nodes
if self.interface.nodes:
my_num = getattr(self.interface, 'myInfo', None)
if my_num and hasattr(my_num, 'my_node_num'):
for n in self.interface.nodes.values():
if n.get("num") == my_num.my_node_num:
result["device"]["long_name"] = n.get("user", {}).get("longName", "")
result["device"]["short_name"] = n.get("user", {}).get("shortName", "")
result["device"]["hw_model"] = n.get("user", {}).get("hwModel", result["device"].get("hw_model", ""))
break
return result
def _on_connection(self, interface, topic=pub.AUTO_TOPIC):
logger.info("Meshtastic connection established")
if hasattr(interface, 'nodes') and interface.nodes:
for node in interface.nodes.values():
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node))
def _on_node_updated(self, node, topic=pub.AUTO_TOPIC):
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node))
def _on_receive(self, packet, interface, topic=pub.AUTO_TOPIC):
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_packet(packet))
async def _handle_node_update(self, node):
try:
logger.debug("Node update raw: %s", node)
node_id = node.get("user", {}).get("id")
node_num = node.get("num")
if not node_id and node_num:
node_id = f"!{node_num:08x}"
if not node_id:
return
node_id = str(node_id)
user = node.get("user", {})
position = node.get("position", {})
metrics = node.get("deviceMetrics", {})
snr = node.get("snr")
data = {
"node_num": node_num,
"long_name": user.get("longName"),
"short_name": user.get("shortName"),
"hw_model": user.get("hwModel"),
"lat": position.get("latitude"),
"lon": position.get("longitude"),
"alt": position.get("altitude"),
"battery": metrics.get("batteryLevel"),
"voltage": metrics.get("voltage"),
"hop_count": node.get("hopsAway"),
"via_mqtt": 1 if node.get("viaMqtt") else 0,
}
if snr is not None:
data["snr"] = snr
updated = await self.db.upsert_node(node_id, **data)
if self.ws_manager:
await self.ws_manager.broadcast("node_update", updated)
except Exception:
logger.exception("Error handling node update")
async def _handle_packet(self, packet):
try:
from_id = packet.get("fromId", str(packet.get("from", "")))
to_id = packet.get("toId", str(packet.get("to", "")))
portnum = packet.get("decoded", {}).get("portnum", "")
channel = packet.get("channel", 0)
# Update node info from packet
node_data = {"snr": packet.get("snr"), "rssi": packet.get("rssi"),
"hop_count": packet.get("hopStart", 0) - packet.get("hopLimit", 0) if packet.get("hopStart") else None}
if from_id:
await self.db.upsert_node(str(from_id), **{k: v for k, v in node_data.items() if v is not None})
# Handle nodeinfo
if portnum == "NODEINFO_APP":
user = packet.get("decoded", {}).get("user", {})
if user and from_id:
await self.db.upsert_node(str(from_id),
long_name=user.get("longName"),
short_name=user.get("shortName"),
hw_model=user.get("hwModel"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle position updates
if portnum == "POSITION_APP":
pos = packet.get("decoded", {}).get("position", {})
if pos and from_id:
await self.db.upsert_node(str(from_id),
lat=pos.get("latitude"),
lon=pos.get("longitude"),
alt=pos.get("altitude"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle telemetry
if portnum == "TELEMETRY_APP":
telemetry = packet.get("decoded", {}).get("telemetry", {})
metrics = telemetry.get("deviceMetrics", {})
if metrics and from_id:
await self.db.upsert_node(str(from_id),
battery=metrics.get("batteryLevel"),
voltage=metrics.get("voltage"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle text messages
if portnum == "TEXT_MESSAGE_APP":
text = packet.get("decoded", {}).get("text", "")
msg = await self.db.insert_message(str(from_id), str(to_id), channel, portnum, text)
if self.ws_manager:
await self.ws_manager.broadcast("new_message", msg)
stats = await self.db.get_stats()
await self.ws_manager.broadcast("stats_update", stats)
# Process commands
prefix = config.get("bot.command_prefix", "!")
if text.startswith(prefix):
await self._handle_command(text.strip(), channel, str(from_id))
except Exception:
logger.exception("Error handling packet")
async def execute_command(self, text: str, channel: int, from_id: str | None = None):
"""Execute a bot command programmatically (used by scheduler and message handler)."""
prefix = config.get("bot.command_prefix", "!")
cmd = text.split()[0].lower()
response = None
if cmd == f"{prefix}ping":
hops_str = ""
if from_id:
node = await self.db.get_node(from_id)
if node and node.get("hop_count") is not None:
hops_str = f" ueber {node['hop_count']} Hops"
response = f"🏓 Pong{hops_str}!"
elif cmd == f"{prefix}nodes":
nodes = await self.db.get_all_nodes()
response = f"📡 Bekannte Nodes: {len(nodes)}"
elif cmd == f"{prefix}info":
uptime = self._format_uptime()
response = f" {config.get('bot.name', 'MeshDD-Bot')} v{config.get('version', '0.0.0')}\nUptime: {uptime}"
elif cmd == f"{prefix}help":
response = (
f"📋 Kommandos:\n"
f"{prefix}ping - Pong\n"
f"{prefix}nodes - Anzahl Nodes\n"
f"{prefix}info - Bot-Info\n"
f"{prefix}stats - Statistiken\n"
f"{prefix}uptime - Laufzeit\n"
f"{prefix}weather [plz:XXXXX] - Wetter\n"
f"{prefix}mesh - Mesh-Netzwerk\n"
f"{prefix}help - Diese Hilfe"
)
elif cmd == f"{prefix}weather":
args = text.split()[1:]
plz = None
for arg in args:
if arg.lower().startswith("plz:"):
plz = arg[4:]
break
response = await self._get_weather(from_id, plz=plz)
elif cmd == f"{prefix}stats":
stats = await self.db.get_stats()
response = (
f"📊 Statistiken:\n"
f"Nodes: {stats['total_nodes']}\n"
f"Aktiv (24h): {stats['nodes_24h']}\n"
f"Anfragen: {stats['total_commands']}"
)
elif cmd == f"{prefix}mesh":
response = await self._get_mesh_info()
elif cmd == f"{prefix}uptime":
response = f"⏱️ Uptime: {self._format_uptime()}"
if response:
await self._send_text(response, channel)
await self.db.insert_command(cmd)
if self.ws_manager:
stats = await self.db.get_stats()
await self.ws_manager.broadcast("stats_update", stats)
async def send_message(self, text: str, channel: int):
"""Send a free-text message on the given channel."""
await self._send_text(text, channel)
async def _handle_command(self, text: str, channel: int, from_id: str):
await self.execute_command(text, channel, from_id)
async def _send_text(self, text: str, channel: int, max_len: int = 170):
if not self.interface:
return
# Reserve space for "[x/y] " prefix (max 7 bytes)
parts = self._split_message(text, max_len - 7)
total = len(parts)
for i, part in enumerate(parts):
if i > 0:
await asyncio.sleep(3.0)
msg = f"[{i+1}/{total}] {part}" if total > 1 else part
try:
self.interface.sendText(msg, channelIndex=channel)
except Exception:
logger.exception("Error sending text")
@staticmethod
def _split_message(text: str, max_len: int) -> list[str]:
if len(text.encode('utf-8')) <= max_len:
return [text]
lines = text.split('\n')
parts = []
current = ""
for line in lines:
candidate = f"{current}\n{line}" if current else line
if len(candidate.encode('utf-8')) > max_len:
if current:
parts.append(current)
if len(line.encode('utf-8')) > max_len:
while line:
chunk = line[:max_len]
while len(chunk.encode('utf-8')) > max_len:
chunk = chunk[:-1]
parts.append(chunk)
line = line[len(chunk):]
else:
current = line
else:
current = candidate
if current:
parts.append(current)
return parts
def _format_uptime(self) -> str:
elapsed = int(time.time() - self.start_time)
days, remainder = divmod(elapsed, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
parts.append(f"{seconds}s")
return " ".join(parts)
async def _get_mesh_info(self) -> str:
nodes = await self.db.get_all_nodes()
total = len(nodes)
now = time.time()
online = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 900)
active_24h = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 86400)
with_pos = sum(1 for n in nodes if n.get("lat") and n.get("lon"))
# Hop distribution
hop_counts = {}
for n in nodes:
h = n.get("hop_count")
if h is not None:
hop_counts[h] = hop_counts.get(h, 0) + 1
hop_lines = []
for k in sorted(hop_counts.keys()):
label = "Direkt" if k == 0 else f"{k} Hop{'s' if k > 1 else ''}"
hop_lines.append(f" {label}: {hop_counts[k]}")
# Hardware distribution (top 3)
hw_counts = {}
for n in nodes:
hw = n.get("hw_model")
if hw:
hw_counts[hw] = hw_counts.get(hw, 0) + 1
top_hw = sorted(hw_counts.items(), key=lambda x: -x[1])[:3]
hw_lines = [f" {hw}: {cnt}" for hw, cnt in top_hw]
parts = [
f"🕸️ Mesh-Netzwerk:\n"
f"Nodes: {total} ({online} online)\n"
f"Aktiv 24h: {active_24h}\n"
f"Mit Position: {with_pos}",
]
if hop_lines:
parts.append("📊 Hop-Verteilung:\n" + "\n".join(hop_lines))
if hw_lines:
parts.append("🔧 Top Hardware:\n" + "\n".join(hw_lines))
return "\n\n".join(parts)
async def _get_weather(self, from_id: str, plz: str | None = None) -> str:
lat, lon = None, None
if plz:
coords = await self._geocode_plz(plz)
if coords:
lat, lon = coords
else:
return f"❌ PLZ {plz} nicht gefunden."
if lat is None:
node = await self.db.get_node(from_id)
if node and node.get("lat") and node.get("lon"):
lat, lon = node["lat"], node["lon"]
else:
lat, lon = 51.0504, 13.7373 # Dresden Zentrum
url = (
f"https://api.open-meteo.com/v1/forecast?"
f"latitude={lat}&longitude={lon}"
f"&current=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code"
)
try:
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, self._fetch_url, url)
current = data.get("current", {})
temp = current.get("temperature_2m", "?")
humidity = current.get("relative_humidity_2m", "?")
wind = current.get("wind_speed_10m", "?")
code = current.get("weather_code", 0)
weather_icon = self._weather_code_to_icon(code)
location = await self._reverse_geocode(lat, lon)
loc_str = f" ({location})" if location else ""
return (
f"{weather_icon} Wetter{loc_str}:\n"
f"Temp: {temp}°C\n"
f"Feuchte: {humidity}%\n"
f"Wind: {wind} km/h"
)
except Exception:
logger.exception("Error fetching weather")
return "❌ Wetterdaten konnten nicht abgerufen werden."
async def _geocode_plz(self, plz: str) -> tuple[float, float] | None:
url = (
f"https://nominatim.openstreetmap.org/search?"
f"postalcode={plz}&country=DE&format=json&limit=1&accept-language=de"
)
try:
loop = asyncio.get_event_loop()
req = urllib.request.Request(url, headers={"User-Agent": "MeshDD-Bot/1.0"})
data = await loop.run_in_executor(None, self._fetch_request, req)
if data and len(data) > 0:
return float(data[0]["lat"]), float(data[0]["lon"])
except Exception:
logger.debug("Geocode PLZ failed for %s", plz)
return None
async def _reverse_geocode(self, lat: float, lon: float) -> str:
url = (
f"https://nominatim.openstreetmap.org/reverse?"
f"lat={lat}&lon={lon}&format=json&zoom=10&accept-language=de"
)
try:
loop = asyncio.get_event_loop()
req = urllib.request.Request(url, headers={"User-Agent": "MeshDD-Bot/1.0"})
data = await loop.run_in_executor(None, self._fetch_request, req)
addr = data.get("address", {})
return addr.get("city") or addr.get("town") or addr.get("village") or addr.get("municipality") or ""
except Exception:
logger.debug("Reverse geocode failed for %s,%s", lat, lon)
return ""
@staticmethod
def _fetch_request(req: urllib.request.Request) -> dict:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
@staticmethod
def _fetch_url(url: str) -> dict:
with urllib.request.urlopen(url, timeout=10) as resp:
return json.loads(resp.read().decode())
@staticmethod
def _weather_code_to_icon(code: int) -> str:
if code == 0:
return "☀️"
if code in (1, 2, 3):
return ""
if code in (45, 48):
return "🌫️"
if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82):
return "🌧️"
if code in (71, 73, 75, 77, 85, 86):
return "🌨️"
if code in (95, 96, 99):
return "⛈️"
return "🌤️"