- config.yaml: neuer Parameter web.online_threshold (Default: 900 s) - /api/stats und WS stats_update liefern online_threshold - dashboard.js: isOnline() nutzt onlineThreshold aus Stats-API - bot.py (?mesh) und database.py (nodes_online) nutzen Config-Wert Closes #12. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
291 lines
12 KiB
Python
291 lines
12 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
|
|
from aiohttp import web
|
|
|
|
from meshbot import config
|
|
from meshbot.database import Database
|
|
from meshbot.auth import setup_session, auth_middleware, setup_auth_routes, require_user_api, require_admin_api
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
|
|
|
|
|
|
class WebSocketManager:
|
|
def __init__(self):
|
|
self.clients: set[web.WebSocketResponse] = set()
|
|
self.auth_clients: set[web.WebSocketResponse] = set()
|
|
|
|
async def close_all(self):
|
|
for ws in list(self.clients):
|
|
try:
|
|
await ws.close()
|
|
except Exception:
|
|
pass
|
|
self.clients.clear()
|
|
self.auth_clients.clear()
|
|
|
|
async def broadcast(self, msg_type: str, data: dict | list):
|
|
message = json.dumps({"type": msg_type, "data": data})
|
|
closed = set()
|
|
for ws in self.clients:
|
|
try:
|
|
await ws.send_str(message)
|
|
except Exception:
|
|
closed.add(ws)
|
|
self.clients -= closed
|
|
self.auth_clients -= closed
|
|
|
|
async def broadcast_auth(self, msg_type: str, data: dict | list):
|
|
"""Broadcast only to authenticated WebSocket clients."""
|
|
message = json.dumps({"type": msg_type, "data": data})
|
|
closed = set()
|
|
for ws in self.auth_clients:
|
|
try:
|
|
await ws.send_str(message)
|
|
except Exception:
|
|
closed.add(ws)
|
|
self.auth_clients -= closed
|
|
self.clients -= closed
|
|
|
|
|
|
class WebServer:
|
|
def __init__(self, db: Database, ws_manager: WebSocketManager, bot=None, scheduler=None, nina=None):
|
|
self.db = db
|
|
self.ws_manager = ws_manager
|
|
self.bot = bot
|
|
self.scheduler = scheduler
|
|
self.nina = nina
|
|
self.app = web.Application()
|
|
setup_session(self.app)
|
|
self.app.middlewares.append(auth_middleware)
|
|
self._setup_routes()
|
|
setup_auth_routes(self.app, db)
|
|
|
|
def _setup_routes(self):
|
|
self.app.router.add_get("/ws", self._ws_handler)
|
|
self.app.router.add_get("/api/nodes", self._api_nodes)
|
|
self.app.router.add_get("/api/messages", self._api_messages)
|
|
self.app.router.add_get("/api/packets", self._api_packets)
|
|
self.app.router.add_get("/api/stats", self._api_stats)
|
|
self.app.router.add_get("/api/scheduler/jobs", self._api_scheduler_get)
|
|
self.app.router.add_post("/api/scheduler/jobs", self._api_scheduler_add)
|
|
self.app.router.add_put("/api/scheduler/jobs/{name}", self._api_scheduler_update)
|
|
self.app.router.add_delete("/api/scheduler/jobs/{name}", self._api_scheduler_delete)
|
|
self.app.router.add_post("/api/send", self._api_send)
|
|
self.app.router.add_get("/api/node/config", self._api_node_config)
|
|
self.app.router.add_get("/api/nina/config", self._api_nina_get)
|
|
self.app.router.add_put("/api/nina/config", self._api_nina_update)
|
|
self.app.router.add_get("/api/nina/alerts", self._api_nina_alerts)
|
|
self.app.router.add_get("/api/links", self._api_links)
|
|
self.app.router.add_get("/login", self._serve_login)
|
|
self.app.router.add_get("/register", self._serve_login)
|
|
self.app.router.add_get("/admin", self._serve_admin)
|
|
self.app.router.add_get("/settings", self._serve_settings)
|
|
self.app.router.add_get("/scheduler", self._serve_scheduler)
|
|
self.app.router.add_get("/nina", self._serve_nina)
|
|
self.app.router.add_get("/map", self._serve_map)
|
|
self.app.router.add_get("/packets", self._serve_packets)
|
|
self.app.router.add_get("/messages", self._serve_messages)
|
|
self.app.router.add_get("/", self._serve_index)
|
|
self.app.router.add_static("/static", STATIC_DIR)
|
|
|
|
async def _ws_handler(self, request: web.Request) -> web.WebSocketResponse:
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
user = request.get("user")
|
|
self.ws_manager.clients.add(ws)
|
|
if user:
|
|
self.ws_manager.auth_clients.add(ws)
|
|
logger.info("WebSocket client connected (%d total, %d auth)", len(self.ws_manager.clients), len(self.ws_manager.auth_clients))
|
|
|
|
try:
|
|
# Send initial data
|
|
nodes = await self.db.get_all_nodes()
|
|
await ws.send_str(json.dumps({"type": "initial", "data": nodes}))
|
|
|
|
my_id = self.bot.get_my_node_id() if self.bot else None
|
|
stats = await self.db.get_stats(my_node_id=my_id)
|
|
stats["version"] = config.get("version", "0.00.00")
|
|
stats["online_threshold"] = config.get("web.online_threshold", 900)
|
|
if self.bot:
|
|
stats["uptime"] = self.bot.get_uptime()
|
|
stats["bot_connected"] = self.bot.is_connected()
|
|
await ws.send_str(json.dumps({"type": "stats_update", "data": stats}))
|
|
|
|
if self.bot:
|
|
channels = self.bot.get_channels()
|
|
await ws.send_str(json.dumps({"type": "channels", "data": channels}))
|
|
my_id = self.bot.get_my_node_id()
|
|
if my_id:
|
|
await ws.send_str(json.dumps({"type": "my_node_id", "data": my_id}))
|
|
await ws.send_str(json.dumps({"type": "bot_status", "data": {
|
|
"connected": self.bot.is_connected(),
|
|
"uptime": self.bot.get_uptime(),
|
|
}}))
|
|
|
|
packets = await self.db.get_recent_packets(200)
|
|
await ws.send_str(json.dumps({"type": "initial_packets", "data": packets}))
|
|
|
|
messages = await self.db.get_recent_messages(50)
|
|
await ws.send_str(json.dumps({"type": "initial_messages", "data": messages}))
|
|
|
|
async for msg in ws:
|
|
pass # We only send, not receive
|
|
finally:
|
|
self.ws_manager.clients.discard(ws)
|
|
self.ws_manager.auth_clients.discard(ws)
|
|
logger.info("WebSocket client disconnected (%d remaining)", len(self.ws_manager.clients))
|
|
|
|
return ws
|
|
|
|
async def _api_nodes(self, request: web.Request) -> web.Response:
|
|
nodes = await self.db.get_all_nodes()
|
|
return web.json_response(nodes)
|
|
|
|
async def _api_messages(self, request: web.Request) -> web.Response:
|
|
limit = int(request.query.get("limit", "50"))
|
|
messages = await self.db.get_recent_messages(limit)
|
|
return web.json_response(messages)
|
|
|
|
async def _api_packets(self, request: web.Request) -> web.Response:
|
|
limit = int(request.query.get("limit", "200"))
|
|
packets = await self.db.get_recent_packets(limit)
|
|
return web.json_response(packets)
|
|
|
|
async def _api_stats(self, request: web.Request) -> web.Response:
|
|
my_id = self.bot.get_my_node_id() if self.bot else None
|
|
stats = await self.db.get_stats(my_node_id=my_id)
|
|
stats["version"] = config.get("version", "0.00.00")
|
|
stats["online_threshold"] = config.get("web.online_threshold", 900)
|
|
if self.bot:
|
|
stats["uptime"] = self.bot.get_uptime()
|
|
stats["bot_connected"] = self.bot.is_connected()
|
|
return web.json_response(stats)
|
|
|
|
async def _api_links(self, request: web.Request) -> web.Response:
|
|
links = config.get("links", []) or []
|
|
return web.json_response(links)
|
|
|
|
async def _api_send(self, request: web.Request) -> web.Response:
|
|
require_user_api(request)
|
|
if not self.bot:
|
|
return web.json_response({"error": "Bot not available"}, status=503)
|
|
data = await request.json()
|
|
text = data.get("text", "").strip()
|
|
channel = int(data.get("channel", 0))
|
|
if not text:
|
|
return web.json_response({"error": "Text is required"}, status=400)
|
|
await self.bot.send_message(text, channel)
|
|
return web.json_response({"ok": True})
|
|
|
|
async def _api_node_config(self, request: web.Request) -> web.Response:
|
|
require_admin_api(request)
|
|
if not self.bot:
|
|
return web.json_response({"error": "Bot not available"}, status=503)
|
|
try:
|
|
cfg = self.bot.get_node_config()
|
|
return web.json_response(cfg)
|
|
except Exception:
|
|
logger.exception("Error getting node config")
|
|
return web.json_response({"error": "Failed to get config"}, status=500)
|
|
|
|
async def _serve_login(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "login.html"))
|
|
|
|
async def _serve_admin(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "admin.html"))
|
|
|
|
async def _serve_settings(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "settings.html"))
|
|
|
|
async def _serve_index(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "index.html"))
|
|
|
|
async def _serve_map(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "map.html"))
|
|
|
|
async def _serve_packets(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "packets.html"))
|
|
|
|
async def _serve_messages(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "messages.html"))
|
|
|
|
async def _serve_scheduler(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "scheduler.html"))
|
|
|
|
async def _serve_nina(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(os.path.join(STATIC_DIR, "nina.html"))
|
|
|
|
async def _api_nina_get(self, request: web.Request) -> web.Response:
|
|
require_admin_api(request)
|
|
if not self.nina:
|
|
return web.json_response({"error": "NINA not available"}, status=503)
|
|
return web.json_response(self.nina.get_config())
|
|
|
|
async def _api_nina_update(self, request: web.Request) -> web.Response:
|
|
require_admin_api(request)
|
|
if not self.nina:
|
|
return web.json_response({"error": "NINA not available"}, status=503)
|
|
updates = await request.json()
|
|
cfg = self.nina.update_config(updates)
|
|
asyncio.create_task(self.nina.trigger_poll())
|
|
return web.json_response(cfg)
|
|
|
|
async def _api_nina_alerts(self, request: web.Request) -> web.Response:
|
|
require_admin_api(request)
|
|
if not self.nina:
|
|
return web.json_response([])
|
|
return web.json_response(self.nina.get_active_alerts())
|
|
|
|
async def _api_scheduler_get(self, request: web.Request) -> web.Response:
|
|
if not self.scheduler:
|
|
return web.json_response([], status=200)
|
|
return web.json_response(self.scheduler.get_jobs())
|
|
|
|
async def _api_scheduler_add(self, request: web.Request) -> web.Response:
|
|
require_admin_api(request)
|
|
if not self.scheduler:
|
|
return web.json_response({"error": "Scheduler not available"}, status=503)
|
|
job = await request.json()
|
|
jobs = self.scheduler.add_job(job)
|
|
if self.ws_manager:
|
|
await self.ws_manager.broadcast("scheduler_update", jobs)
|
|
return web.json_response(jobs, status=201)
|
|
|
|
async def _api_scheduler_update(self, request: web.Request) -> web.Response:
|
|
require_admin_api(request)
|
|
if not self.scheduler:
|
|
return web.json_response({"error": "Scheduler not available"}, status=503)
|
|
name = request.match_info["name"]
|
|
updates = await request.json()
|
|
jobs = self.scheduler.update_job(name, updates)
|
|
if jobs is None:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
if self.ws_manager:
|
|
await self.ws_manager.broadcast("scheduler_update", jobs)
|
|
return web.json_response(jobs)
|
|
|
|
async def _api_scheduler_delete(self, request: web.Request) -> web.Response:
|
|
require_admin_api(request)
|
|
if not self.scheduler:
|
|
return web.json_response({"error": "Scheduler not available"}, status=503)
|
|
name = request.match_info["name"]
|
|
jobs = self.scheduler.delete_job(name)
|
|
if jobs is None:
|
|
return web.json_response({"error": "Job not found"}, status=404)
|
|
if self.ws_manager:
|
|
await self.ws_manager.broadcast("scheduler_update", jobs)
|
|
return web.json_response(jobs)
|
|
|
|
async def start(self, host: str, port: int):
|
|
runner = web.AppRunner(self.app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, host, port)
|
|
await site.start()
|
|
logger.info("Webserver started at http://%s:%d", host, port)
|
|
return runner
|