import asyncio import logging import os from typing import Callable, Awaitable import aiohttp import yaml logger = logging.getLogger(__name__) NINA_API_BASE = "https://warnung.bund.de/api31" NINA_CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "nina.yaml") SEVERITY_ORDER = { "Unknown": -1, "Minor": 0, "Moderate": 1, "Severe": 2, "Extreme": 3, } SEVERITY_LABELS = { "Extreme": "EXTREM", "Severe": "Schwerwiegend", "Moderate": "Maessig", "Minor": "Gering", } # Warning ID prefixes for source filtering SOURCE_PREFIXES = { "katwarn": "katwarn.", "biwapp": "biwapp.", "mowas": "mowas.", "dwd": "dwd.", "lhp": "lhp.", "police": "police.", } DEFAULT_CONFIG = { "enabled": False, "poll_interval": 300, "channel": 0, "min_severity": "Severe", "ags_codes": [], "sources": { "katwarn": True, "biwapp": True, "mowas": True, "dwd": True, "lhp": True, "police": False, }, } class NinaBot: """Polls the NINA BBK warning API and forwards alerts to Meshtastic.""" def __init__(self, send_callback: Callable[[str, int], Awaitable[None]], ws_manager=None): self.send_callback = send_callback self.ws_manager = ws_manager self.config: dict = {} self._mtime: float = 0.0 self._known: dict[str, str] = {} # id -> sent timestamp (de-dup) self._running = False self._task: asyncio.Task | None = None self._load() # ── Config ────────────────────────────────────────────────────────────── def _load(self): try: with open(NINA_CONFIG_PATH) as f: data = yaml.safe_load(f) or {} self.config = {**DEFAULT_CONFIG, **data} if "sources" in data: self.config["sources"] = {**DEFAULT_CONFIG["sources"], **data["sources"]} self._mtime = os.path.getmtime(NINA_CONFIG_PATH) logger.info( "NINA config loaded (enabled=%s, codes=%s)", self.config.get("enabled"), self.config.get("ags_codes"), ) except FileNotFoundError: logger.info("No nina.yaml found – using defaults") self.config = { **DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"]), } self._save() except Exception: logger.exception("Error loading nina.yaml") self.config = { **DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"]), } def _save(self): try: with open(NINA_CONFIG_PATH, "w") as f: yaml.dump( self.config, f, default_flow_style=False, allow_unicode=True, sort_keys=False, ) self._mtime = os.path.getmtime(NINA_CONFIG_PATH) logger.info("NINA config saved") except Exception: logger.exception("Error saving nina.yaml") def get_config(self) -> dict: return self.config def update_config(self, updates: dict) -> dict: if "sources" in updates: self.config["sources"] = { **self.config.get("sources", {}), **updates.pop("sources"), } self.config.update(updates) self._save() return self.config # ── Lifecycle ──────────────────────────────────────────────────────────── async def start(self): self._running = True self._task = asyncio.create_task(self._poll_loop()) logger.info("NinaBot started") async def stop(self): self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass # ── Hot-reload ─────────────────────────────────────────────────────────── async def watch(self, interval: float = 5.0): """Reload nina.yaml when the file changes on disk.""" while True: await asyncio.sleep(interval) try: current_mtime = os.path.getmtime(NINA_CONFIG_PATH) if current_mtime != self._mtime: self._load() except FileNotFoundError: pass except Exception: logger.exception("Error watching nina.yaml") # ── Polling ────────────────────────────────────────────────────────────── async def _poll_loop(self): while self._running: try: if self.config.get("enabled"): await self._check_alerts() except Exception: logger.exception("NINA polling error") interval = max(60, int(self.config.get("poll_interval", 300))) await asyncio.sleep(interval) async def _check_alerts(self): ags_codes = self.config.get("ags_codes", []) if not ags_codes: return min_level = SEVERITY_ORDER.get(self.config.get("min_severity", "Severe"), 2) channel = int(self.config.get("channel", 0)) sources = self.config.get("sources", DEFAULT_CONFIG["sources"]) async with aiohttp.ClientSession( headers={"User-Agent": "MeshDD-Bot/1.0 (+https://github.com/ppfeiffer/MeshDD-Bot)"}, timeout=aiohttp.ClientTimeout(total=30), ) as session: for ags in ags_codes: try: await self._fetch_dashboard(session, str(ags).strip(), min_level, channel, sources) except Exception: logger.exception("NINA error for AGS %s", ags) async def _fetch_dashboard( self, session: aiohttp.ClientSession, ags: str, min_level: int, channel: int, sources: dict, ): # Pad AGS/ARS to 12 characters ars = ags.ljust(12, "0") url = f"{NINA_API_BASE}/dashboard/{ars}.json" async with session.get(url) as resp: if resp.status == 404: logger.warning("NINA: no data for AGS %s (404)", ags) return if resp.status != 200: logger.warning("NINA API returned status %d for %s", resp.status, url) return items = await resp.json(content_type=None) if not isinstance(items, list): logger.warning("NINA: unexpected response type for AGS %s", ags) return for item in items: try: await self._process_item(item, min_level, channel, sources) except Exception: logger.exception("NINA: error processing item %s", item.get("id")) async def _process_item(self, item: dict, min_level: int, channel: int, sources: dict): identifier = item.get("id", "") if not identifier: return # Filter by source for source_key, prefix in SOURCE_PREFIXES.items(): if identifier.startswith(prefix): if not sources.get(source_key, True): return break payload = item.get("payload", {}) sent = payload.get("sent", item.get("sent", "")) data = payload.get("data", {}) msg_type = payload.get("msgType", data.get("msgType", "Alert")) severity = data.get("severity", "Unknown") sev_level = SEVERITY_ORDER.get(severity, -1) if sev_level < min_level and msg_type != "Cancel": return # De-duplicate: skip if already processed with same sent timestamp if identifier in self._known and self._known[identifier] == sent: return self._known[identifier] = sent headline = data.get("headline", "Warnung") description = data.get("description", "") if msg_type == "Cancel": text = f"[NINA] Aufgehoben: {headline}" else: sev_text = SEVERITY_LABELS.get(severity, severity) text = f"[NINA] {sev_text}: {headline}" if description: short_desc = description.strip()[:120] if len(description.strip()) > 120: short_desc += "..." text += f"\n{short_desc}" logger.info("NINA alert forwarded: %s (id=%s)", headline, identifier) await self.send_callback(text, channel) if self.ws_manager: await self.ws_manager.broadcast("nina_alert", { "id": identifier, "severity": severity, "msgType": msg_type, "headline": headline, "sent": sent, })