import asyncio import json import logging import os from aiohttp import web from meshbot import config from meshbot.database import Database from meshbot.auth import setup_session, auth_middleware, setup_auth_routes, require_user_api, require_admin_api logger = logging.getLogger(__name__) STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static") class WebSocketManager: def __init__(self): self.clients: set[web.WebSocketResponse] = set() self.auth_clients: set[web.WebSocketResponse] = set() async def close_all(self): for ws in list(self.clients): try: await ws.close() except Exception: pass self.clients.clear() self.auth_clients.clear() async def broadcast(self, msg_type: str, data: dict | list): message = json.dumps({"type": msg_type, "data": data}) closed = set() for ws in self.clients: try: await ws.send_str(message) except Exception: closed.add(ws) self.clients -= closed self.auth_clients -= closed async def broadcast_auth(self, msg_type: str, data: dict | list): """Broadcast only to authenticated WebSocket clients.""" message = json.dumps({"type": msg_type, "data": data}) closed = set() for ws in self.auth_clients: try: await ws.send_str(message) except Exception: closed.add(ws) self.auth_clients -= closed self.clients -= closed class WebServer: def __init__(self, db: Database, ws_manager: WebSocketManager, bot=None, scheduler=None, nina=None): self.db = db self.ws_manager = ws_manager self.bot = bot self.scheduler = scheduler self.nina = nina self.app = web.Application() setup_session(self.app) self.app.middlewares.append(auth_middleware) self._setup_routes() setup_auth_routes(self.app, db) def _setup_routes(self): self.app.router.add_get("/ws", self._ws_handler) self.app.router.add_get("/api/nodes", self._api_nodes) self.app.router.add_get("/api/messages", self._api_messages) self.app.router.add_get("/api/packets", self._api_packets) self.app.router.add_get("/api/stats", self._api_stats) self.app.router.add_get("/api/scheduler/jobs", self._api_scheduler_get) self.app.router.add_post("/api/scheduler/jobs", self._api_scheduler_add) self.app.router.add_put("/api/scheduler/jobs/{name}", self._api_scheduler_update) self.app.router.add_delete("/api/scheduler/jobs/{name}", self._api_scheduler_delete) self.app.router.add_post("/api/send", self._api_send) self.app.router.add_get("/api/node/config", self._api_node_config) self.app.router.add_get("/api/nina/config", self._api_nina_get) self.app.router.add_put("/api/nina/config", self._api_nina_update) self.app.router.add_get("/api/nina/alerts", self._api_nina_alerts) self.app.router.add_get("/login", self._serve_login) self.app.router.add_get("/register", self._serve_login) self.app.router.add_get("/admin", self._serve_admin) self.app.router.add_get("/settings", self._serve_settings) self.app.router.add_get("/scheduler", self._serve_scheduler) self.app.router.add_get("/nina", self._serve_nina) self.app.router.add_get("/map", self._serve_map) self.app.router.add_get("/packets", self._serve_packets) self.app.router.add_get("/", self._serve_index) self.app.router.add_static("/static", STATIC_DIR) async def _ws_handler(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) user = request.get("user") self.ws_manager.clients.add(ws) if user: self.ws_manager.auth_clients.add(ws) logger.info("WebSocket client connected (%d total, %d auth)", len(self.ws_manager.clients), len(self.ws_manager.auth_clients)) try: # Send initial data nodes = await self.db.get_all_nodes() await ws.send_str(json.dumps({"type": "initial", "data": nodes})) stats = await self.db.get_stats() stats["version"] = config.get("version", "0.0.0") if self.bot: stats["uptime"] = self.bot.get_uptime() stats["bot_connected"] = self.bot.is_connected() await ws.send_str(json.dumps({"type": "stats_update", "data": stats})) if self.bot: channels = self.bot.get_channels() await ws.send_str(json.dumps({"type": "channels", "data": channels})) my_id = self.bot.get_my_node_id() if my_id: await ws.send_str(json.dumps({"type": "my_node_id", "data": my_id})) await ws.send_str(json.dumps({"type": "bot_status", "data": { "connected": self.bot.is_connected(), "uptime": self.bot.get_uptime(), }})) packets = await self.db.get_recent_packets(200) await ws.send_str(json.dumps({"type": "initial_packets", "data": packets})) if user: messages = await self.db.get_recent_messages(50) await ws.send_str(json.dumps({"type": "initial_messages", "data": messages})) async for msg in ws: pass # We only send, not receive finally: self.ws_manager.clients.discard(ws) self.ws_manager.auth_clients.discard(ws) logger.info("WebSocket client disconnected (%d remaining)", len(self.ws_manager.clients)) return ws async def _api_nodes(self, request: web.Request) -> web.Response: nodes = await self.db.get_all_nodes() return web.json_response(nodes) async def _api_messages(self, request: web.Request) -> web.Response: limit = int(request.query.get("limit", "50")) messages = await self.db.get_recent_messages(limit) return web.json_response(messages) async def _api_packets(self, request: web.Request) -> web.Response: limit = int(request.query.get("limit", "200")) packets = await self.db.get_recent_packets(limit) return web.json_response(packets) async def _api_stats(self, request: web.Request) -> web.Response: stats = await self.db.get_stats() stats["version"] = config.get("version", "0.0.0") if self.bot: stats["uptime"] = self.bot.get_uptime() stats["bot_connected"] = self.bot.is_connected() return web.json_response(stats) async def _api_send(self, request: web.Request) -> web.Response: require_user_api(request) if not self.bot: return web.json_response({"error": "Bot not available"}, status=503) data = await request.json() text = data.get("text", "").strip() channel = int(data.get("channel", 0)) if not text: return web.json_response({"error": "Text is required"}, status=400) await self.bot.send_message(text, channel) return web.json_response({"ok": True}) async def _api_node_config(self, request: web.Request) -> web.Response: require_admin_api(request) if not self.bot: return web.json_response({"error": "Bot not available"}, status=503) try: cfg = self.bot.get_node_config() return web.json_response(cfg) except Exception: logger.exception("Error getting node config") return web.json_response({"error": "Failed to get config"}, status=500) async def _serve_login(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "login.html")) async def _serve_admin(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "admin.html")) async def _serve_settings(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "settings.html")) async def _serve_index(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "index.html")) async def _serve_map(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "map.html")) async def _serve_packets(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "packets.html")) async def _serve_scheduler(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "scheduler.html")) async def _serve_nina(self, request: web.Request) -> web.Response: return web.FileResponse(os.path.join(STATIC_DIR, "nina.html")) async def _api_nina_get(self, request: web.Request) -> web.Response: require_admin_api(request) if not self.nina: return web.json_response({"error": "NINA not available"}, status=503) return web.json_response(self.nina.get_config()) async def _api_nina_update(self, request: web.Request) -> web.Response: require_admin_api(request) if not self.nina: return web.json_response({"error": "NINA not available"}, status=503) updates = await request.json() cfg = self.nina.update_config(updates) asyncio.create_task(self.nina.trigger_poll()) return web.json_response(cfg) async def _api_nina_alerts(self, request: web.Request) -> web.Response: require_admin_api(request) if not self.nina: return web.json_response([]) return web.json_response(self.nina.get_active_alerts()) async def _api_scheduler_get(self, request: web.Request) -> web.Response: if not self.scheduler: return web.json_response([], status=200) return web.json_response(self.scheduler.get_jobs()) async def _api_scheduler_add(self, request: web.Request) -> web.Response: require_admin_api(request) if not self.scheduler: return web.json_response({"error": "Scheduler not available"}, status=503) job = await request.json() jobs = self.scheduler.add_job(job) if self.ws_manager: await self.ws_manager.broadcast("scheduler_update", jobs) return web.json_response(jobs, status=201) async def _api_scheduler_update(self, request: web.Request) -> web.Response: require_admin_api(request) if not self.scheduler: return web.json_response({"error": "Scheduler not available"}, status=503) name = request.match_info["name"] updates = await request.json() jobs = self.scheduler.update_job(name, updates) if jobs is None: return web.json_response({"error": "Job not found"}, status=404) if self.ws_manager: await self.ws_manager.broadcast("scheduler_update", jobs) return web.json_response(jobs) async def _api_scheduler_delete(self, request: web.Request) -> web.Response: require_admin_api(request) if not self.scheduler: return web.json_response({"error": "Scheduler not available"}, status=503) name = request.match_info["name"] jobs = self.scheduler.delete_job(name) if jobs is None: return web.json_response({"error": "Job not found"}, status=404) if self.ws_manager: await self.ws_manager.broadcast("scheduler_update", jobs) return web.json_response(jobs) async def start(self, host: str, port: int): runner = web.AppRunner(self.app) await runner.setup() site = web.TCPSite(runner, host, port) await site.start() logger.info("Webserver started at http://%s:%d", host, port) return runner