MeshDD-Bot/meshbot/webserver.py
ppfeiffer a6988fbb1f fix(stats): eigene Telemetrie aus Pakettypen-Diagramm ausschließen
get_stats() filtert TELEMETRY_APP-Pakete des eigenen Nodes wenn my_node_id
übergeben wird – konsistent mit isSuppressed() im Paket-Log-Frontend.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 06:31:07 +01:00

285 lines
12 KiB
Python

import asyncio
import json
import logging
import os
from aiohttp import web
from meshbot import config
from meshbot.database import Database
from meshbot.auth import setup_session, auth_middleware, setup_auth_routes, require_user_api, require_admin_api
logger = logging.getLogger(__name__)
STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
class WebSocketManager:
def __init__(self):
self.clients: set[web.WebSocketResponse] = set()
self.auth_clients: set[web.WebSocketResponse] = set()
async def close_all(self):
for ws in list(self.clients):
try:
await ws.close()
except Exception:
pass
self.clients.clear()
self.auth_clients.clear()
async def broadcast(self, msg_type: str, data: dict | list):
message = json.dumps({"type": msg_type, "data": data})
closed = set()
for ws in self.clients:
try:
await ws.send_str(message)
except Exception:
closed.add(ws)
self.clients -= closed
self.auth_clients -= closed
async def broadcast_auth(self, msg_type: str, data: dict | list):
"""Broadcast only to authenticated WebSocket clients."""
message = json.dumps({"type": msg_type, "data": data})
closed = set()
for ws in self.auth_clients:
try:
await ws.send_str(message)
except Exception:
closed.add(ws)
self.auth_clients -= closed
self.clients -= closed
class WebServer:
def __init__(self, db: Database, ws_manager: WebSocketManager, bot=None, scheduler=None, nina=None):
self.db = db
self.ws_manager = ws_manager
self.bot = bot
self.scheduler = scheduler
self.nina = nina
self.app = web.Application()
setup_session(self.app)
self.app.middlewares.append(auth_middleware)
self._setup_routes()
setup_auth_routes(self.app, db)
def _setup_routes(self):
self.app.router.add_get("/ws", self._ws_handler)
self.app.router.add_get("/api/nodes", self._api_nodes)
self.app.router.add_get("/api/messages", self._api_messages)
self.app.router.add_get("/api/packets", self._api_packets)
self.app.router.add_get("/api/stats", self._api_stats)
self.app.router.add_get("/api/scheduler/jobs", self._api_scheduler_get)
self.app.router.add_post("/api/scheduler/jobs", self._api_scheduler_add)
self.app.router.add_put("/api/scheduler/jobs/{name}", self._api_scheduler_update)
self.app.router.add_delete("/api/scheduler/jobs/{name}", self._api_scheduler_delete)
self.app.router.add_post("/api/send", self._api_send)
self.app.router.add_get("/api/node/config", self._api_node_config)
self.app.router.add_get("/api/nina/config", self._api_nina_get)
self.app.router.add_put("/api/nina/config", self._api_nina_update)
self.app.router.add_get("/api/nina/alerts", self._api_nina_alerts)
self.app.router.add_get("/login", self._serve_login)
self.app.router.add_get("/register", self._serve_login)
self.app.router.add_get("/admin", self._serve_admin)
self.app.router.add_get("/settings", self._serve_settings)
self.app.router.add_get("/scheduler", self._serve_scheduler)
self.app.router.add_get("/nina", self._serve_nina)
self.app.router.add_get("/map", self._serve_map)
self.app.router.add_get("/packets", self._serve_packets)
self.app.router.add_get("/messages", self._serve_messages)
self.app.router.add_get("/", self._serve_index)
self.app.router.add_static("/static", STATIC_DIR)
async def _ws_handler(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
user = request.get("user")
self.ws_manager.clients.add(ws)
if user:
self.ws_manager.auth_clients.add(ws)
logger.info("WebSocket client connected (%d total, %d auth)", len(self.ws_manager.clients), len(self.ws_manager.auth_clients))
try:
# Send initial data
nodes = await self.db.get_all_nodes()
await ws.send_str(json.dumps({"type": "initial", "data": nodes}))
my_id = self.bot.get_my_node_id() if self.bot else None
stats = await self.db.get_stats(my_node_id=my_id)
stats["version"] = config.get("version", "0.0.0")
if self.bot:
stats["uptime"] = self.bot.get_uptime()
stats["bot_connected"] = self.bot.is_connected()
await ws.send_str(json.dumps({"type": "stats_update", "data": stats}))
if self.bot:
channels = self.bot.get_channels()
await ws.send_str(json.dumps({"type": "channels", "data": channels}))
my_id = self.bot.get_my_node_id()
if my_id:
await ws.send_str(json.dumps({"type": "my_node_id", "data": my_id}))
await ws.send_str(json.dumps({"type": "bot_status", "data": {
"connected": self.bot.is_connected(),
"uptime": self.bot.get_uptime(),
}}))
packets = await self.db.get_recent_packets(200)
await ws.send_str(json.dumps({"type": "initial_packets", "data": packets}))
if user:
messages = await self.db.get_recent_messages(50)
await ws.send_str(json.dumps({"type": "initial_messages", "data": messages}))
async for msg in ws:
pass # We only send, not receive
finally:
self.ws_manager.clients.discard(ws)
self.ws_manager.auth_clients.discard(ws)
logger.info("WebSocket client disconnected (%d remaining)", len(self.ws_manager.clients))
return ws
async def _api_nodes(self, request: web.Request) -> web.Response:
nodes = await self.db.get_all_nodes()
return web.json_response(nodes)
async def _api_messages(self, request: web.Request) -> web.Response:
limit = int(request.query.get("limit", "50"))
messages = await self.db.get_recent_messages(limit)
return web.json_response(messages)
async def _api_packets(self, request: web.Request) -> web.Response:
limit = int(request.query.get("limit", "200"))
packets = await self.db.get_recent_packets(limit)
return web.json_response(packets)
async def _api_stats(self, request: web.Request) -> web.Response:
my_id = self.bot.get_my_node_id() if self.bot else None
stats = await self.db.get_stats(my_node_id=my_id)
stats["version"] = config.get("version", "0.0.0")
if self.bot:
stats["uptime"] = self.bot.get_uptime()
stats["bot_connected"] = self.bot.is_connected()
return web.json_response(stats)
async def _api_send(self, request: web.Request) -> web.Response:
require_user_api(request)
if not self.bot:
return web.json_response({"error": "Bot not available"}, status=503)
data = await request.json()
text = data.get("text", "").strip()
channel = int(data.get("channel", 0))
if not text:
return web.json_response({"error": "Text is required"}, status=400)
await self.bot.send_message(text, channel)
return web.json_response({"ok": True})
async def _api_node_config(self, request: web.Request) -> web.Response:
require_admin_api(request)
if not self.bot:
return web.json_response({"error": "Bot not available"}, status=503)
try:
cfg = self.bot.get_node_config()
return web.json_response(cfg)
except Exception:
logger.exception("Error getting node config")
return web.json_response({"error": "Failed to get config"}, status=500)
async def _serve_login(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "login.html"))
async def _serve_admin(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "admin.html"))
async def _serve_settings(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "settings.html"))
async def _serve_index(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "index.html"))
async def _serve_map(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "map.html"))
async def _serve_packets(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "packets.html"))
async def _serve_messages(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "messages.html"))
async def _serve_scheduler(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "scheduler.html"))
async def _serve_nina(self, request: web.Request) -> web.Response:
return web.FileResponse(os.path.join(STATIC_DIR, "nina.html"))
async def _api_nina_get(self, request: web.Request) -> web.Response:
require_admin_api(request)
if not self.nina:
return web.json_response({"error": "NINA not available"}, status=503)
return web.json_response(self.nina.get_config())
async def _api_nina_update(self, request: web.Request) -> web.Response:
require_admin_api(request)
if not self.nina:
return web.json_response({"error": "NINA not available"}, status=503)
updates = await request.json()
cfg = self.nina.update_config(updates)
asyncio.create_task(self.nina.trigger_poll())
return web.json_response(cfg)
async def _api_nina_alerts(self, request: web.Request) -> web.Response:
require_admin_api(request)
if not self.nina:
return web.json_response([])
return web.json_response(self.nina.get_active_alerts())
async def _api_scheduler_get(self, request: web.Request) -> web.Response:
if not self.scheduler:
return web.json_response([], status=200)
return web.json_response(self.scheduler.get_jobs())
async def _api_scheduler_add(self, request: web.Request) -> web.Response:
require_admin_api(request)
if not self.scheduler:
return web.json_response({"error": "Scheduler not available"}, status=503)
job = await request.json()
jobs = self.scheduler.add_job(job)
if self.ws_manager:
await self.ws_manager.broadcast("scheduler_update", jobs)
return web.json_response(jobs, status=201)
async def _api_scheduler_update(self, request: web.Request) -> web.Response:
require_admin_api(request)
if not self.scheduler:
return web.json_response({"error": "Scheduler not available"}, status=503)
name = request.match_info["name"]
updates = await request.json()
jobs = self.scheduler.update_job(name, updates)
if jobs is None:
return web.json_response({"error": "Job not found"}, status=404)
if self.ws_manager:
await self.ws_manager.broadcast("scheduler_update", jobs)
return web.json_response(jobs)
async def _api_scheduler_delete(self, request: web.Request) -> web.Response:
require_admin_api(request)
if not self.scheduler:
return web.json_response({"error": "Scheduler not available"}, status=503)
name = request.match_info["name"]
jobs = self.scheduler.delete_job(name)
if jobs is None:
return web.json_response({"error": "Job not found"}, status=404)
if self.ws_manager:
await self.ws_manager.broadcast("scheduler_update", jobs)
return web.json_response(jobs)
async def start(self, host: str, port: int):
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
logger.info("Webserver started at http://%s:%d", host, port)
return runner