MeshDD-Bot/meshbot/bot.py
ppfeiffer 1d768c6921 feat: v0.5.7 - Anfragen pro Kanal mit Kanalnamen im Dashboard
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 16:24:26 +01:00

600 lines
24 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import time
import json
import urllib.request
from google.protobuf.json_format import MessageToDict
from meshtastic.tcp_interface import TCPInterface
from pubsub import pub
from meshbot import config
from meshbot.database import Database
logger = logging.getLogger(__name__)
class MeshBot:
def __init__(self, db: Database, loop: asyncio.AbstractEventLoop):
self.db = db
self.loop = loop
self.interface: TCPInterface | None = None
self.start_time = time.time()
self.ws_manager = None # set by main.py
def connect(self):
host = config.get("meshtastic.host", "localhost")
port = config.get("meshtastic.port", 4403)
logger.info("Connecting to Meshtastic at %s:%s", host, port)
self.interface = TCPInterface(hostname=host, portNumber=port)
pub.subscribe(self._on_receive, "meshtastic.receive")
pub.subscribe(self._on_connection, "meshtastic.connection.established")
pub.subscribe(self._on_node_updated, "meshtastic.node.updated")
logger.info("Connected to Meshtastic")
def disconnect(self):
if self.interface:
self.interface.close()
def get_channels(self) -> dict:
channels = {}
if self.interface and self.interface.localNode:
for ch in self.interface.localNode.channels:
if ch.role:
name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}")
channels[ch.index] = name
return channels
def get_node_config(self) -> dict:
result = {"device": {}, "lora": {}, "channels": [], "position": {}, "power": {}, "bluetooth": {}, "network": {}}
if not self.interface or not self.interface.localNode:
return result
node = self.interface.localNode
my_info = getattr(self.interface, 'myInfo', None)
metadata = getattr(self.interface, 'metadata', None)
# Device info
if my_info:
info = MessageToDict(my_info) if hasattr(my_info, 'DESCRIPTOR') else (my_info if isinstance(my_info, dict) else {})
result["device"]["node_num"] = info.get("myNodeNum", "")
result["device"]["max_channels"] = info.get("maxChannels", "")
if metadata:
meta = MessageToDict(metadata) if hasattr(metadata, 'DESCRIPTOR') else (metadata if isinstance(metadata, dict) else {})
result["device"]["firmware_version"] = meta.get("firmwareVersion", "")
result["device"]["hw_model"] = meta.get("hwModel", "")
# Local config sections
local_config = getattr(node, 'localConfig', None)
if local_config:
cfg = MessageToDict(local_config, preserving_proto_field_name=True) if hasattr(local_config, 'DESCRIPTOR') else {}
lora = cfg.get("lora", {})
result["lora"] = {
"region": lora.get("region", ""),
"modem_preset": lora.get("modem_preset", ""),
"hop_limit": lora.get("hop_limit", ""),
"tx_power": lora.get("tx_power", ""),
"bandwidth": lora.get("bandwidth", ""),
"frequency_offset": lora.get("frequency_offset", ""),
"tx_enabled": lora.get("tx_enabled", ""),
"use_preset": lora.get("use_preset", ""),
}
device = cfg.get("device", {})
result["device"]["name"] = device.get("role", "")
result["device"]["role"] = device.get("role", "")
pos = cfg.get("position", {})
result["position"] = {
"gps_mode": pos.get("gps_mode", pos.get("gps_enabled", "")),
"position_broadcast_secs": pos.get("position_broadcast_secs", ""),
"fixed_position": pos.get("fixed_position", ""),
}
power = cfg.get("power", {})
result["power"] = {
"mesh_sds_timeout_secs": power.get("mesh_sds_timeout_secs", ""),
"ls_secs": power.get("ls_secs", ""),
"min_wake_secs": power.get("min_wake_secs", ""),
"is_power_saving": power.get("is_power_saving", ""),
}
bt = cfg.get("bluetooth", {})
result["bluetooth"] = {
"enabled": bt.get("enabled", ""),
"mode": bt.get("mode", ""),
}
network = cfg.get("network", {})
result["network"] = {
"wifi_enabled": network.get("wifi_enabled", ""),
"ntp_server": network.get("ntp_server", ""),
}
# Channels
for ch in node.channels:
if ch.role:
name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}")
psk = ch.settings.psk
psk_display = "Default" if psk == b'\x01' else ("Custom" if psk and psk != b'\x00' else "None")
result["channels"].append({
"index": ch.index,
"name": name,
"role": str(ch.role),
"psk": psk_display,
})
# Node long name from interface nodes
if self.interface.nodes:
my_num = getattr(self.interface, 'myInfo', None)
if my_num and hasattr(my_num, 'my_node_num'):
for n in self.interface.nodes.values():
if n.get("num") == my_num.my_node_num:
result["device"]["long_name"] = n.get("user", {}).get("longName", "")
result["device"]["short_name"] = n.get("user", {}).get("shortName", "")
result["device"]["hw_model"] = n.get("user", {}).get("hwModel", result["device"].get("hw_model", ""))
break
return result
def _on_connection(self, interface, topic=pub.AUTO_TOPIC):
logger.info("Meshtastic connection established")
if hasattr(interface, 'nodes') and interface.nodes:
for node in interface.nodes.values():
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node))
def _on_node_updated(self, node, topic=pub.AUTO_TOPIC):
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node))
def _on_receive(self, packet, interface, topic=pub.AUTO_TOPIC):
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_packet(packet))
async def _handle_node_update(self, node):
try:
logger.debug("Node update raw: %s", node)
node_id = node.get("user", {}).get("id")
node_num = node.get("num")
if not node_id and node_num:
node_id = f"!{node_num:08x}"
if not node_id:
return
node_id = str(node_id)
user = node.get("user", {})
position = node.get("position", {})
metrics = node.get("deviceMetrics", {})
snr = node.get("snr")
data = {
"node_num": node_num,
"long_name": user.get("longName"),
"short_name": user.get("shortName"),
"hw_model": user.get("hwModel"),
"lat": position.get("latitude"),
"lon": position.get("longitude"),
"alt": position.get("altitude"),
"battery": metrics.get("batteryLevel"),
"voltage": metrics.get("voltage"),
"hop_count": node.get("hopsAway"),
"via_mqtt": 1 if node.get("viaMqtt") else 0,
}
if snr is not None:
data["snr"] = snr
updated = await self.db.upsert_node(node_id, **data)
if self.ws_manager:
await self.ws_manager.broadcast("node_update", updated)
except Exception:
logger.exception("Error handling node update")
async def _handle_packet(self, packet):
try:
from_id = packet.get("fromId", str(packet.get("from", "")))
to_id = packet.get("toId", str(packet.get("to", "")))
portnum = packet.get("decoded", {}).get("portnum", "")
channel = packet.get("channel", 0)
# Update node info from packet
node_data = {"snr": packet.get("snr"), "rssi": packet.get("rssi"),
"hop_count": packet.get("hopStart", 0) - packet.get("hopLimit", 0) if packet.get("hopStart") else None}
if from_id:
await self.db.upsert_node(str(from_id), **{k: v for k, v in node_data.items() if v is not None})
# Handle nodeinfo
if portnum == "NODEINFO_APP":
user = packet.get("decoded", {}).get("user", {})
if user and from_id:
await self.db.upsert_node(str(from_id),
long_name=user.get("longName"),
short_name=user.get("shortName"),
hw_model=user.get("hwModel"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle position updates
if portnum == "POSITION_APP":
pos = packet.get("decoded", {}).get("position", {})
if pos and from_id:
await self.db.upsert_node(str(from_id),
lat=pos.get("latitude"),
lon=pos.get("longitude"),
alt=pos.get("altitude"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle telemetry
if portnum == "TELEMETRY_APP":
telemetry = packet.get("decoded", {}).get("telemetry", {})
metrics = telemetry.get("deviceMetrics", {})
if metrics and from_id:
await self.db.upsert_node(str(from_id),
battery=metrics.get("batteryLevel"),
voltage=metrics.get("voltage"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle text messages
if portnum == "TEXT_MESSAGE_APP":
text = packet.get("decoded", {}).get("text", "")
my_id = self.get_my_node_id()
is_own = my_id and str(from_id) == my_id
if not is_own:
msg = await self.db.insert_message(str(from_id), str(to_id), channel, portnum, text)
if self.ws_manager:
await self.ws_manager.broadcast("new_message", msg)
stats = await self.db.get_stats()
await self.ws_manager.broadcast("stats_update", stats)
# Process commands
prefix = config.get("bot.command_prefix", "!")
if text.startswith(prefix):
await self._handle_command(text.strip(), channel, str(from_id))
except Exception:
logger.exception("Error handling packet")
async def execute_command(self, text: str, channel: int, from_id: str | None = None):
"""Execute a bot command programmatically (used by scheduler and message handler)."""
prefix = config.get("bot.command_prefix", "!")
cmd = text.split()[0].lower()
response = None
if cmd == f"{prefix}ping":
hops_str = ""
if from_id:
node = await self.db.get_node(from_id)
if node and node.get("hop_count") is not None:
hops_str = f" ueber {node['hop_count']} Hops"
response = f"🏓 Pong{hops_str}!"
elif cmd == f"{prefix}nodes":
nodes = await self.db.get_all_nodes()
response = f"📡 Bekannte Nodes: {len(nodes)}"
elif cmd == f"{prefix}info":
uptime = self._format_uptime()
response = f" {config.get('bot.name', 'MeshDD-Bot')} v{config.get('version', '0.0.0')}\nUptime: {uptime}"
elif cmd == f"{prefix}help":
response = (
f"📋 Kommandos:\n"
f"{prefix}ping - Pong\n"
f"{prefix}nodes - Anzahl Nodes\n"
f"{prefix}info - Bot-Info\n"
f"{prefix}stats - Statistiken\n"
f"{prefix}uptime - Laufzeit\n"
f"{prefix}weather [plz:XXXXX] - Wetter\n"
f"{prefix}me - Meine Node-Infos\n"
f"{prefix}mesh - Mesh-Netzwerk\n"
f"{prefix}help - Diese Hilfe"
)
elif cmd == f"{prefix}me":
response = await self._get_my_info(from_id)
elif cmd == f"{prefix}weather":
args = text.split()[1:]
plz = None
for arg in args:
if arg.lower().startswith("plz:"):
plz = arg[4:]
break
response = await self._get_weather(from_id, plz=plz)
elif cmd == f"{prefix}stats":
stats = await self.db.get_stats()
response = (
f"📊 Statistiken:\n"
f"Nodes: {stats['total_nodes']}\n"
f"Aktiv (24h): {stats['nodes_24h']}\n"
f"Anfragen: {stats['total_commands']}"
)
elif cmd == f"{prefix}mesh":
response = await self._get_mesh_info()
elif cmd == f"{prefix}uptime":
response = f"⏱️ Uptime: {self._format_uptime()}"
if response:
await self._send_text(response, channel)
await self.db.insert_command(cmd, channel)
if self.ws_manager:
stats = await self.db.get_stats()
await self.ws_manager.broadcast("stats_update", stats)
async def send_message(self, text: str, channel: int):
"""Send a free-text message on the given channel."""
await self._send_text(text, channel)
async def _handle_command(self, text: str, channel: int, from_id: str):
await self.execute_command(text, channel, from_id)
def get_my_node_id(self) -> str | None:
if self.interface and hasattr(self.interface, 'myInfo') and self.interface.myInfo:
num = self.interface.myInfo.my_node_num
return f"!{num:08x}"
return None
async def _send_text(self, text: str, channel: int, max_len: int = 170):
if not self.interface:
return
my_id = self.get_my_node_id() or "bot"
# Reserve space for "[x/y] " prefix (max 7 bytes)
parts = self._split_message(text, max_len - 7)
total = len(parts)
for i, part in enumerate(parts):
if i > 0:
await asyncio.sleep(3.0)
msg = f"[{i+1}/{total}] {part}" if total > 1 else part
# Store and broadcast first, then send via radio
try:
stored = await self.db.insert_message(my_id, "^all", channel, "TEXT_MESSAGE_APP", msg)
if self.ws_manager:
await self.ws_manager.broadcast("new_message", stored)
except Exception:
logger.exception("Error storing sent message")
try:
self.interface.sendText(msg, channelIndex=channel)
except Exception:
logger.exception("Error sending text via radio")
@staticmethod
def _split_message(text: str, max_len: int) -> list[str]:
if len(text.encode('utf-8')) <= max_len:
return [text]
lines = text.split('\n')
parts = []
current = ""
for line in lines:
candidate = f"{current}\n{line}" if current else line
if len(candidate.encode('utf-8')) > max_len:
if current:
parts.append(current)
if len(line.encode('utf-8')) > max_len:
while line:
chunk = line[:max_len]
while len(chunk.encode('utf-8')) > max_len:
chunk = chunk[:-1]
parts.append(chunk)
line = line[len(chunk):]
else:
current = line
else:
current = candidate
if current:
parts.append(current)
return parts
def _format_uptime(self) -> str:
elapsed = int(time.time() - self.start_time)
days, remainder = divmod(elapsed, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
parts.append(f"{seconds}s")
return " ".join(parts)
async def _get_my_info(self, from_id: str) -> str:
if not from_id:
return "❌ Node-ID unbekannt."
node = await self.db.get_node(from_id)
if not node:
return f"❌ Keine Daten fuer {from_id}."
name = node.get("long_name") or node.get("short_name") or from_id
hw = node.get("hw_model") or "-"
hops = node.get("hop_count") if node.get("hop_count") is not None else "-"
snr = f"{node['snr']:.1f} dB" if node.get("snr") is not None else "-"
rssi = f"{node['rssi']} dBm" if node.get("rssi") is not None else "-"
bat = f"{node['battery']}%" if node.get("battery") is not None else "-"
voltage = f"{node['voltage']:.2f}V" if node.get("voltage") is not None else "-"
alt = f"{int(node['alt'])} m" if node.get("alt") is not None else "-"
pos = ""
if node.get("lat") and node.get("lon"):
pos = f"{node['lat']:.4f}, {node['lon']:.4f}"
last_seen = ""
if node.get("last_seen"):
elapsed = int(time.time() - node["last_seen"])
if elapsed < 60:
last_seen = f"{elapsed}s"
elif elapsed < 3600:
last_seen = f"{elapsed // 60}m"
elif elapsed < 86400:
last_seen = f"{elapsed // 3600}h"
else:
last_seen = f"{elapsed // 86400}d"
lines = [
f"📋 {name}",
f"ID: {from_id}",
f"HW: {hw}",
f"Hops: {hops}",
f"SNR: {snr} | RSSI: {rssi}",
f"Bat: {bat} ({voltage})",
]
if pos:
lines.append(f"Pos: {pos} | {alt}")
if last_seen:
lines.append(f"Zuletzt: vor {last_seen}")
return "\n".join(lines)
async def _get_mesh_info(self) -> str:
nodes = await self.db.get_all_nodes()
total = len(nodes)
now = time.time()
online = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 900)
active_24h = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 86400)
with_pos = sum(1 for n in nodes if n.get("lat") and n.get("lon"))
# Hop distribution
hop_counts = {}
for n in nodes:
h = n.get("hop_count")
if h is not None:
hop_counts[h] = hop_counts.get(h, 0) + 1
hop_lines = []
for k in sorted(hop_counts.keys()):
label = "Direkt" if k == 0 else f"{k} Hop{'s' if k > 1 else ''}"
hop_lines.append(f" {label}: {hop_counts[k]}")
# Hardware distribution (top 3)
hw_counts = {}
for n in nodes:
hw = n.get("hw_model")
if hw:
hw_counts[hw] = hw_counts.get(hw, 0) + 1
top_hw = sorted(hw_counts.items(), key=lambda x: -x[1])[:3]
hw_lines = [f" {hw}: {cnt}" for hw, cnt in top_hw]
parts = [
f"🕸️ Mesh-Netzwerk:\n"
f"Nodes: {total} ({online} online)\n"
f"Aktiv 24h: {active_24h}\n"
f"Mit Position: {with_pos}",
]
if hop_lines:
parts.append("📊 Hop-Verteilung:\n" + "\n".join(hop_lines))
if hw_lines:
parts.append("🔧 Top Hardware:\n" + "\n".join(hw_lines))
return "\n\n".join(parts)
async def _get_weather(self, from_id: str, plz: str | None = None) -> str:
lat, lon = None, None
if plz:
coords = await self._geocode_plz(plz)
if coords:
lat, lon = coords
else:
return f"❌ PLZ {plz} nicht gefunden."
if lat is None:
node = await self.db.get_node(from_id)
if node and node.get("lat") and node.get("lon"):
lat, lon = node["lat"], node["lon"]
else:
lat, lon = 51.0504, 13.7373 # Dresden Zentrum
url = (
f"https://api.open-meteo.com/v1/forecast?"
f"latitude={lat}&longitude={lon}"
f"&current=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code,pressure_msl,dew_point_2m"
)
try:
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, self._fetch_url, url)
current = data.get("current", {})
temp = current.get("temperature_2m", "?")
humidity = current.get("relative_humidity_2m", "?")
wind = current.get("wind_speed_10m", "?")
code = current.get("weather_code", 0)
pressure = current.get("pressure_msl", "?")
dewpoint = current.get("dew_point_2m", "?")
weather_icon = self._weather_code_to_icon(code)
location = await self._reverse_geocode(lat, lon)
loc_str = f" ({location})" if location else ""
return (
f"{weather_icon} Wetter{loc_str}:\n"
f"Temp: {temp}°C\n"
f"Feuchte: {humidity}%\n"
f"Taupunkt: {dewpoint}°C\n"
f"Luftdruck: {pressure} hPa\n"
f"Wind: {wind} km/h"
)
except Exception:
logger.exception("Error fetching weather")
return "❌ Wetterdaten konnten nicht abgerufen werden."
async def _geocode_plz(self, plz: str) -> tuple[float, float] | None:
url = (
f"https://nominatim.openstreetmap.org/search?"
f"postalcode={plz}&country=DE&format=json&limit=1&accept-language=de"
)
try:
loop = asyncio.get_event_loop()
req = urllib.request.Request(url, headers={"User-Agent": "MeshDD-Bot/1.0"})
data = await loop.run_in_executor(None, self._fetch_request, req)
if data and len(data) > 0:
return float(data[0]["lat"]), float(data[0]["lon"])
except Exception:
logger.debug("Geocode PLZ failed for %s", plz)
return None
async def _reverse_geocode(self, lat: float, lon: float) -> str:
url = (
f"https://nominatim.openstreetmap.org/reverse?"
f"lat={lat}&lon={lon}&format=json&zoom=10&accept-language=de"
)
try:
loop = asyncio.get_event_loop()
req = urllib.request.Request(url, headers={"User-Agent": "MeshDD-Bot/1.0"})
data = await loop.run_in_executor(None, self._fetch_request, req)
addr = data.get("address", {})
return addr.get("city") or addr.get("town") or addr.get("village") or addr.get("municipality") or ""
except Exception:
logger.debug("Reverse geocode failed for %s,%s", lat, lon)
return ""
@staticmethod
def _fetch_request(req: urllib.request.Request) -> dict:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
@staticmethod
def _fetch_url(url: str) -> dict:
with urllib.request.urlopen(url, timeout=10) as resp:
return json.loads(resp.read().decode())
@staticmethod
def _weather_code_to_icon(code: int) -> str:
if code == 0:
return "☀️"
if code in (1, 2, 3):
return ""
if code in (45, 48):
return "🌫️"
if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82):
return "🌧️"
if code in (71, 73, 75, 77, 85, 86):
return "🌨️"
if code in (95, 96, 99):
return "⛈️"
return "🌤️"