MeshDD-Bot/meshbot/bot.py
ppfeiffer 6bfb1595e4 feat: v0.3.3 - Send messages from dashboard, scheduler message type, modern UI
Add message sending from web dashboard with channel selector, new POST /api/send
endpoint, scheduler support for free-text messages (type field), and modernized
dashboard layout with glassmorphism navbar, gradient stat cards, chat bubbles.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 17:40:00 +01:00

378 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import time
import json
import urllib.request
from meshtastic.tcp_interface import TCPInterface
from pubsub import pub
from meshbot import config
from meshbot.database import Database
logger = logging.getLogger(__name__)
class MeshBot:
def __init__(self, db: Database, loop: asyncio.AbstractEventLoop):
self.db = db
self.loop = loop
self.interface: TCPInterface | None = None
self.start_time = time.time()
self.ws_manager = None # set by main.py
def connect(self):
host = config.get("meshtastic.host", "localhost")
port = config.get("meshtastic.port", 4403)
logger.info("Connecting to Meshtastic at %s:%s", host, port)
self.interface = TCPInterface(hostname=host, portNumber=port)
pub.subscribe(self._on_receive, "meshtastic.receive")
pub.subscribe(self._on_connection, "meshtastic.connection.established")
pub.subscribe(self._on_node_updated, "meshtastic.node.updated")
logger.info("Connected to Meshtastic")
def disconnect(self):
if self.interface:
self.interface.close()
def get_channels(self) -> dict:
channels = {}
if self.interface and self.interface.localNode:
for ch in self.interface.localNode.channels:
if ch.role:
name = ch.settings.name if ch.settings.name else ("Primary" if ch.index == 0 else f"Ch {ch.index}")
channels[ch.index] = name
return channels
def _on_connection(self, interface, topic=pub.AUTO_TOPIC):
logger.info("Meshtastic connection established")
if hasattr(interface, 'nodes') and interface.nodes:
for node in interface.nodes.values():
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node))
def _on_node_updated(self, node, topic=pub.AUTO_TOPIC):
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_node_update(node))
def _on_receive(self, packet, interface, topic=pub.AUTO_TOPIC):
self.loop.call_soon_threadsafe(asyncio.ensure_future, self._handle_packet(packet))
async def _handle_node_update(self, node):
try:
logger.debug("Node update raw: %s", node)
node_id = node.get("user", {}).get("id")
node_num = node.get("num")
if not node_id and node_num:
node_id = f"!{node_num:08x}"
if not node_id:
return
node_id = str(node_id)
user = node.get("user", {})
position = node.get("position", {})
metrics = node.get("deviceMetrics", {})
snr = node.get("snr")
data = {
"node_num": node_num,
"long_name": user.get("longName"),
"short_name": user.get("shortName"),
"hw_model": user.get("hwModel"),
"lat": position.get("latitude"),
"lon": position.get("longitude"),
"alt": position.get("altitude"),
"battery": metrics.get("batteryLevel"),
"voltage": metrics.get("voltage"),
"hop_count": node.get("hopsAway"),
"via_mqtt": 1 if node.get("viaMqtt") else 0,
}
if snr is not None:
data["snr"] = snr
updated = await self.db.upsert_node(node_id, **data)
if self.ws_manager:
await self.ws_manager.broadcast("node_update", updated)
except Exception:
logger.exception("Error handling node update")
async def _handle_packet(self, packet):
try:
from_id = packet.get("fromId", str(packet.get("from", "")))
to_id = packet.get("toId", str(packet.get("to", "")))
portnum = packet.get("decoded", {}).get("portnum", "")
channel = packet.get("channel", 0)
# Update node info from packet
node_data = {"snr": packet.get("snr"), "rssi": packet.get("rssi"),
"hop_count": packet.get("hopStart", 0) - packet.get("hopLimit", 0) if packet.get("hopStart") else None}
if from_id:
await self.db.upsert_node(str(from_id), **{k: v for k, v in node_data.items() if v is not None})
# Handle nodeinfo
if portnum == "NODEINFO_APP":
user = packet.get("decoded", {}).get("user", {})
if user and from_id:
await self.db.upsert_node(str(from_id),
long_name=user.get("longName"),
short_name=user.get("shortName"),
hw_model=user.get("hwModel"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle position updates
if portnum == "POSITION_APP":
pos = packet.get("decoded", {}).get("position", {})
if pos and from_id:
await self.db.upsert_node(str(from_id),
lat=pos.get("latitude"),
lon=pos.get("longitude"),
alt=pos.get("altitude"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle telemetry
if portnum == "TELEMETRY_APP":
telemetry = packet.get("decoded", {}).get("telemetry", {})
metrics = telemetry.get("deviceMetrics", {})
if metrics and from_id:
await self.db.upsert_node(str(from_id),
battery=metrics.get("batteryLevel"),
voltage=metrics.get("voltage"))
node = await self.db.get_node(str(from_id))
if node and self.ws_manager:
await self.ws_manager.broadcast("node_update", node)
# Handle text messages
if portnum == "TEXT_MESSAGE_APP":
text = packet.get("decoded", {}).get("text", "")
msg = await self.db.insert_message(str(from_id), str(to_id), channel, portnum, text)
if self.ws_manager:
await self.ws_manager.broadcast("new_message", msg)
stats = await self.db.get_stats()
await self.ws_manager.broadcast("stats_update", stats)
# Process commands
prefix = config.get("bot.command_prefix", "!")
if text.startswith(prefix):
await self._handle_command(text.strip(), channel, str(from_id))
except Exception:
logger.exception("Error handling packet")
async def execute_command(self, text: str, channel: int, from_id: str | None = None):
"""Execute a bot command programmatically (used by scheduler and message handler)."""
prefix = config.get("bot.command_prefix", "!")
cmd = text.split()[0].lower()
response = None
if cmd == f"{prefix}ping":
response = "🏓 Pong!"
elif cmd == f"{prefix}nodes":
nodes = await self.db.get_all_nodes()
response = f"📡 Bekannte Nodes: {len(nodes)}"
elif cmd == f"{prefix}info":
uptime = self._format_uptime()
response = f" {config.get('bot.name', 'MeshDD-Bot')} v{config.get('version', '0.0.0')}\nUptime: {uptime}"
elif cmd == f"{prefix}help":
response = (
f"📋 Kommandos:\n"
f"{prefix}ping - Pong\n"
f"{prefix}nodes - Anzahl Nodes\n"
f"{prefix}info - Bot-Info\n"
f"{prefix}stats - Statistiken\n"
f"{prefix}uptime - Laufzeit\n"
f"{prefix}weather - Wetter\n"
f"{prefix}mesh - Mesh-Netzwerk\n"
f"{prefix}help - Diese Hilfe"
)
elif cmd == f"{prefix}weather":
response = await self._get_weather(from_id)
elif cmd == f"{prefix}stats":
stats = await self.db.get_stats()
response = (
f"📊 Statistiken:\n"
f"Nodes: {stats['total_nodes']}\n"
f"Aktiv (24h): {stats['nodes_24h']}\n"
f"Anfragen: {stats['total_commands']}"
)
elif cmd == f"{prefix}mesh":
response = await self._get_mesh_info()
elif cmd == f"{prefix}uptime":
response = f"⏱️ Uptime: {self._format_uptime()}"
if response:
await self._send_text(response, channel)
await self.db.insert_command(cmd)
if self.ws_manager:
stats = await self.db.get_stats()
await self.ws_manager.broadcast("stats_update", stats)
async def send_message(self, text: str, channel: int):
"""Send a free-text message on the given channel."""
await self._send_text(text, channel)
async def _handle_command(self, text: str, channel: int, from_id: str):
await self.execute_command(text, channel, from_id)
async def _send_text(self, text: str, channel: int, max_len: int = 170):
if not self.interface:
return
# Reserve space for "[x/y] " prefix (max 7 bytes)
parts = self._split_message(text, max_len - 7)
total = len(parts)
for i, part in enumerate(parts):
if i > 0:
await asyncio.sleep(3.0)
msg = f"[{i+1}/{total}] {part}" if total > 1 else part
try:
self.interface.sendText(msg, channelIndex=channel)
except Exception:
logger.exception("Error sending text")
@staticmethod
def _split_message(text: str, max_len: int) -> list[str]:
if len(text.encode('utf-8')) <= max_len:
return [text]
lines = text.split('\n')
parts = []
current = ""
for line in lines:
candidate = f"{current}\n{line}" if current else line
if len(candidate.encode('utf-8')) > max_len:
if current:
parts.append(current)
if len(line.encode('utf-8')) > max_len:
while line:
chunk = line[:max_len]
while len(chunk.encode('utf-8')) > max_len:
chunk = chunk[:-1]
parts.append(chunk)
line = line[len(chunk):]
else:
current = line
else:
current = candidate
if current:
parts.append(current)
return parts
def _format_uptime(self) -> str:
elapsed = int(time.time() - self.start_time)
days, remainder = divmod(elapsed, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
parts.append(f"{seconds}s")
return " ".join(parts)
async def _get_mesh_info(self) -> str:
nodes = await self.db.get_all_nodes()
total = len(nodes)
now = time.time()
online = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 900)
active_24h = sum(1 for n in nodes if n.get("last_seen") and now - n["last_seen"] < 86400)
with_pos = sum(1 for n in nodes if n.get("lat") and n.get("lon"))
# Hop distribution
hop_counts = {}
for n in nodes:
h = n.get("hop_count")
if h is not None:
hop_counts[h] = hop_counts.get(h, 0) + 1
hop_lines = []
for k in sorted(hop_counts.keys()):
label = "Direkt" if k == 0 else f"{k} Hop{'s' if k > 1 else ''}"
hop_lines.append(f" {label}: {hop_counts[k]}")
# Hardware distribution (top 3)
hw_counts = {}
for n in nodes:
hw = n.get("hw_model")
if hw:
hw_counts[hw] = hw_counts.get(hw, 0) + 1
top_hw = sorted(hw_counts.items(), key=lambda x: -x[1])[:3]
hw_lines = [f" {hw}: {cnt}" for hw, cnt in top_hw]
parts = [
f"🕸️ Mesh-Netzwerk:\n"
f"Nodes: {total} ({online} online)\n"
f"Aktiv 24h: {active_24h}\n"
f"Mit Position: {with_pos}",
]
if hop_lines:
parts.append("📊 Hop-Verteilung:\n" + "\n".join(hop_lines))
if hw_lines:
parts.append("🔧 Top Hardware:\n" + "\n".join(hw_lines))
return "\n\n".join(parts)
async def _get_weather(self, from_id: str) -> str:
node = await self.db.get_node(from_id)
fallback = False
if node and node.get("lat") and node.get("lon"):
lat, lon = node["lat"], node["lon"]
else:
lat, lon = 51.0504, 13.7373 # Dresden Zentrum
fallback = True
url = (
f"https://api.open-meteo.com/v1/forecast?"
f"latitude={lat}&longitude={lon}"
f"&current=temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code"
)
try:
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, self._fetch_url, url)
current = data.get("current", {})
temp = current.get("temperature_2m", "?")
humidity = current.get("relative_humidity_2m", "?")
wind = current.get("wind_speed_10m", "?")
code = current.get("weather_code", 0)
weather_icon = self._weather_code_to_icon(code)
location = " (Dresden)" if fallback else ""
return (
f"{weather_icon} Wetter{location}:\n"
f"Temp: {temp}°C\n"
f"Feuchte: {humidity}%\n"
f"Wind: {wind} km/h"
)
except Exception:
logger.exception("Error fetching weather")
return "❌ Wetterdaten konnten nicht abgerufen werden."
@staticmethod
def _fetch_url(url: str) -> dict:
with urllib.request.urlopen(url, timeout=10) as resp:
return json.loads(resp.read().decode())
@staticmethod
def _weather_code_to_icon(code: int) -> str:
if code == 0:
return "☀️"
if code in (1, 2, 3):
return ""
if code in (45, 48):
return "🌫️"
if code in (51, 53, 55, 56, 57, 61, 63, 65, 66, 67, 80, 81, 82):
return "🌧️"
if code in (71, 73, 75, 77, 85, 86):
return "🌨️"
if code in (95, 96, 99):
return "⛈️"
return "🌤️"