import asyncio import logging import os from datetime import datetime, timezone from typing import Callable, Awaitable import aiohttp import yaml logger = logging.getLogger(__name__) NINA_API_BASE = "https://warnung.bund.de/api31" NINA_CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "nina.yaml") SEVERITY_ORDER = { "Unknown": -1, "Minor": 0, "Moderate": 1, "Severe": 2, "Extreme": 3, } SEVERITY_LABELS = { "Extreme": "EXTREM", "Severe": "Schwerwiegend", "Moderate": "Maessig", "Minor": "Gering", } # Dashboard-Endpunkt: ID-Präfixe je Quelle (für Quellen-Filterung) SOURCE_PREFIXES = { "katwarn": ("katwarn.",), "biwapp": ("biwapp.",), "mowas": ("mowas.", "mow."), "dwd": ("dwd.",), "lhp": ("lhp.",), "police": ("police.",), } # mapData-Endpunkte je Quelle SOURCE_MAP_ENDPOINTS = { "katwarn": "katwarn", "biwapp": "biwapp", "mowas": "mowas", "dwd": "dwd", "lhp": "lhp", "police": "police", } # Normalisierung für quellenübergreifende De-Duplikation (mapData-Präfix → Dashboard-Präfix) ID_NORMALIZATIONS = [ ("dwdmap.", "dwd."), ("lhpmap.", "lhp."), ("polmap.", "police."), ("mow.", "mowas."), ] # Lesbarer Name je sächsischem AGS-Code (12-stellig) AGS_LABELS: dict[str, str] = { "145110000000": "Chemnitz, Stadt", "145210000000": "Erzgebirgskreis", "145220000000": "Mittelsachsen", "145230000000": "Vogtlandkreis", "145240000000": "Zwickau", "146120000000": "Dresden, Stadt", "146250000000": "Bautzen", "146260000000": "Görlitz", "146270000000": "Meißen", "146280000000": "Sächsische Schweiz-Osterzgebirge", "147130000000": "Leipzig, Stadt", "147290000000": "Landkreis Leipzig", "147300000000": "Nordsachsen", } DEFAULT_CONFIG = { "enabled": False, "send_to_mesh": True, "poll_interval": 300, "resend_interval": 3600, "channel": 0, "min_severity": "Severe", "ags_codes": [], "sources": { "katwarn": True, "biwapp": True, "mowas": True, "dwd": True, "lhp": True, "police": False, }, } class NinaBot: """Polls the NINA BBK warning API and forwards alerts to Meshtastic. Two polling strategies run in parallel per cycle: 1. Dashboard – regional, per AGS code (all sources, geographic filter by BBK server) 2. mapData – national, per source (severity-only filter, covers gaps in dashboard) Cross-strategy de-duplication is achieved by normalising warning IDs before storing them in _known (e.g. 'dwdmap.' → 'dwd.'). """ def __init__(self, send_callback: Callable[[str, int], Awaitable[None]], ws_manager=None): self.send_callback = send_callback self.ws_manager = ws_manager self.config: dict = {} self._mtime: float = 0.0 self._known: dict[str, str] = {} # normalised_id -> sent (de-dup) self._active: dict[str, dict] = {} # normalised_id -> {text, channel, headline, severity, id, sent} self._running = False self._task: asyncio.Task | None = None self._resend_task: asyncio.Task | None = None self._last_poll: str = "" self._last_sent: str = "" self._load() # ── Config ────────────────────────────────────────────────────────────── def _load(self): try: with open(NINA_CONFIG_PATH) as f: data = yaml.safe_load(f) or {} self.config = {**DEFAULT_CONFIG, **data} if "sources" in data: self.config["sources"] = {**DEFAULT_CONFIG["sources"], **data["sources"]} self._mtime = os.path.getmtime(NINA_CONFIG_PATH) logger.info( "NINA config loaded (enabled=%s, codes=%s)", self.config.get("enabled"), self.config.get("ags_codes"), ) except FileNotFoundError: logger.info("No nina.yaml found – using defaults") self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])} self._save() except Exception: logger.exception("Error loading nina.yaml") self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])} def _save(self): try: with open(NINA_CONFIG_PATH, "w") as f: yaml.dump( self.config, f, default_flow_style=False, allow_unicode=True, sort_keys=False, ) self._mtime = os.path.getmtime(NINA_CONFIG_PATH) logger.info("NINA config saved") except Exception: logger.exception("Error saving nina.yaml") def get_config(self) -> dict: return {**self.config, "last_poll": self._last_poll, "last_sent": self._last_sent} def get_active_alerts(self) -> list[dict]: return sorted(self._active.values(), key=lambda x: x.get("sent", ""), reverse=True) def update_config(self, updates: dict) -> dict: if "sources" in updates: self.config["sources"] = { **self.config.get("sources", {}), **updates.pop("sources"), } self.config.update(updates) self._save() return self.config # ── Lifecycle ──────────────────────────────────────────────────────────── async def start(self): self._running = True self._task = asyncio.create_task(self._poll_loop()) self._resend_task = asyncio.create_task(self._resend_loop()) logger.info("NinaBot started") async def stop(self): self._running = False for task in (self._task, self._resend_task): if task: task.cancel() try: await task except asyncio.CancelledError: pass # ── Hot-reload ─────────────────────────────────────────────────────────── async def watch(self, interval: float = 5.0): """Reload nina.yaml when the file changes on disk.""" while True: await asyncio.sleep(interval) try: current_mtime = os.path.getmtime(NINA_CONFIG_PATH) if current_mtime != self._mtime: self._load() except FileNotFoundError: pass except Exception: logger.exception("Error watching nina.yaml") # ── Polling ────────────────────────────────────────────────────────────── async def trigger_poll(self) -> None: """Run _check_alerts immediately (e.g. triggered after config save).""" if not self.config.get("enabled"): return try: await self._check_alerts() self._last_poll = datetime.now(timezone.utc).isoformat() except Exception: logger.exception("NINA triggered poll error") async def _poll_loop(self): while self._running: try: if self.config.get("enabled"): await self._check_alerts() self._last_poll = datetime.now(timezone.utc).isoformat() except Exception: logger.exception("NINA polling error") interval = max(60, int(self.config.get("poll_interval", 300))) await asyncio.sleep(interval) async def _resend_loop(self): """Re-broadcast all active warnings at resend_interval when send_to_mesh is enabled.""" while self._running: interval = max(60, int(self.config.get("resend_interval", 3600))) await asyncio.sleep(interval) try: if self.config.get("enabled") and self.config.get("send_to_mesh", True): if self._active: logger.info("NINA resend: %d aktive Warnmeldungen", len(self._active)) for entry in list(self._active.values()): await self.send_callback(entry["text"], entry["channel"]) except Exception: logger.exception("NINA resend error") async def _check_alerts(self): min_level = SEVERITY_ORDER.get(self.config.get("min_severity", "Severe"), 2) channel = int(self.config.get("channel", 0)) sources = self.config.get("sources", DEFAULT_CONFIG["sources"]) ags_codes = self.config.get("ags_codes", []) async with aiohttp.ClientSession( headers={"User-Agent": "MeshDD-Bot/1.0 (+https://github.com/ppfeiffer/MeshDD-Bot)"}, timeout=aiohttp.ClientTimeout(total=30), ) as session: # 1. Dashboard: regional filtering per AGS code (server-side, all sources) for ags in ags_codes: try: await self._fetch_dashboard(session, str(ags).strip(), min_level, channel, sources) except Exception: logger.exception("NINA dashboard error for AGS %s", ags) # 2. mapData: national per-source polling – nur wenn KEINE AGS-Codes konfiguriert # sind, da mapData keine geografische Filterung unterstützt und sonst # bundesweite Meldungen außerhalb der konfigurierten Regionen erscheinen. # Mit AGS-Codes deckt das Dashboard bereits alle Quellen regional ab. if not ags_codes: for source_key, endpoint in SOURCE_MAP_ENDPOINTS.items(): if not sources.get(source_key, True): continue try: await self._fetch_map_data(session, source_key, endpoint, min_level, channel) except Exception: logger.exception("NINA mapData error for source %s", source_key) # ── De-duplication helper ──────────────────────────────────────────────── @staticmethod def _normalise_id(identifier: str) -> str: """Normalise a warning ID to a canonical form for cross-source de-duplication.""" for old, new in ID_NORMALIZATIONS: if identifier.startswith(old): return new + identifier[len(old):] return identifier @staticmethod def _source_key_for(identifier: str) -> str | None: """Return the source config key for a given warning ID, or None if unknown.""" for key, prefixes in SOURCE_PREFIXES.items(): for p in prefixes: if identifier.startswith(p): return key return None # ── Dashboard endpoint ─────────────────────────────────────────────────── async def _fetch_dashboard( self, session: aiohttp.ClientSession, ags: str, min_level: int, channel: int, sources: dict, ): ars = ags.ljust(12, "0") url = f"{NINA_API_BASE}/dashboard/{ars}.json" async with session.get(url) as resp: if resp.status == 404: logger.warning("NINA dashboard: no data for AGS %s (404)", ags) return if resp.status != 200: logger.warning("NINA dashboard: status %d for %s", resp.status, url) return items = await resp.json(content_type=None) if not isinstance(items, list): logger.warning("NINA dashboard: unexpected response type for AGS %s", ags) return for item in items: try: await self._process_dashboard_item(item, min_level, channel, sources, ags) except Exception: logger.exception("NINA dashboard: error processing %s", item.get("id")) async def _process_dashboard_item( self, item: dict, min_level: int, channel: int, sources: dict, ags: str = "" ): identifier = item.get("id", "") if not identifier: return # Filter by source src_key = self._source_key_for(identifier) if src_key and not sources.get(src_key, True): return payload = item.get("payload", {}) sent = payload.get("sent", item.get("sent", "")) data = payload.get("data", {}) msg_type = payload.get("msgType", data.get("msgType", "Alert")) severity = data.get("severity", "Unknown") sev_level = SEVERITY_ORDER.get(severity, -1) if sev_level < min_level and msg_type != "Cancel": return dedup_key = self._normalise_id(identifier) if dedup_key in self._known and self._known[dedup_key] == sent: return self._known[dedup_key] = sent headline = data.get("headline", "Warnung") description = data.get("description", "") area = AGS_LABELS.get(ags.ljust(12, "0"), ags) text = self._format_alert(msg_type, severity, headline, description, area, src_key or "") logger.info("NINA dashboard alert: %s (id=%s, area=%s)", headline, identifier, area) await self._send(identifier, severity, msg_type, headline, sent, text, channel, area) # ── mapData endpoint ───────────────────────────────────────────────────── async def _fetch_map_data( self, session: aiohttp.ClientSession, source_key: str, endpoint: str, min_level: int, channel: int, ): url = f"{NINA_API_BASE}/{endpoint}/mapData.json" async with session.get(url) as resp: if resp.status != 200: logger.warning("NINA mapData %s: status %d", source_key, resp.status) return items = await resp.json(content_type=None) if not isinstance(items, list): logger.warning("NINA mapData %s: unexpected response type", source_key) return for item in items: try: await self._process_map_item(item, min_level, channel, source_key) except Exception: logger.exception("NINA mapData: error processing %s", item.get("id")) async def _process_map_item(self, item: dict, min_level: int, channel: int, source_key: str = ""): identifier = item.get("id", "") if not identifier: return severity = item.get("severity", "Unknown") msg_type = item.get("type", "Alert") sent = item.get("startDate", item.get("sent", "")) sev_level = SEVERITY_ORDER.get(severity, -1) if sev_level < min_level and msg_type != "Cancel": return dedup_key = self._normalise_id(identifier) if dedup_key in self._known and self._known[dedup_key] == sent: return self._known[dedup_key] = sent # Headline aus i18nTitle (Deutsch bevorzugt) i18n = item.get("i18nTitle", {}) headline = i18n.get("de") or i18n.get("en") or identifier text = self._format_alert(msg_type, severity, headline, "", "", source_key) logger.info("NINA mapData alert: %s (id=%s)", headline, identifier) await self._send(identifier, severity, msg_type, headline, sent, text, channel, "") # ── Shared helpers ─────────────────────────────────────────────────────── @staticmethod def _format_alert(msg_type: str, severity: str, headline: str, description: str, area: str = "", source: str = "") -> str: prefix = f"[{source.upper()}@NINA]" if source else "[NINA]" area_suffix = f" ({area})" if area else "" if msg_type == "Cancel": return f"{prefix} Aufgehoben: {headline}{area_suffix}" sev_text = SEVERITY_LABELS.get(severity, severity) text = f"{prefix} {sev_text}: {headline}{area_suffix}" if description: short = description.strip()[:120] if len(description.strip()) > 120: short += "..." text += f"\n{short}" return text async def _send( self, identifier: str, severity: str, msg_type: str, headline: str, sent: str, text: str, channel: int, area: str = "", ): dedup_key = self._normalise_id(identifier) # Keep _active up to date for re-broadcast if msg_type == "Cancel": self._active.pop(dedup_key, None) else: self._active[dedup_key] = { "text": text, "channel": channel, "headline": headline, "severity": severity, "msgType": msg_type, "id": identifier, "sent": sent, "area": area, "monitor_only": not self.config.get("send_to_mesh", True), } if self.config.get("send_to_mesh", True): await self.send_callback(text, channel) self._last_sent = datetime.now(timezone.utc).isoformat() else: logger.info("NINA monitor-only: Mesh-Versand deaktiviert, nur WebSocket-Broadcast") if self.ws_manager: await self.ws_manager.broadcast("nina_alert", { "id": identifier, "severity": severity, "msgType": msg_type, "headline": headline, "sent": sent, "area": area, "monitor_only": not self.config.get("send_to_mesh", True), })