MeshDD-Bot/meshbot/nina.py
ppfeiffer f36a126200 feat(nina): aktive Warnmeldungen beim Seitenaufruf laden (GET /api/nina/alerts)
_active speichert jetzt msgType/area/monitor_only. get_active_alerts() gibt
sortierte Liste zurück. nina.js lädt beim Init und dedupliziert per ID.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 17:37:49 +01:00

472 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import os
from datetime import datetime, timezone
from typing import Callable, Awaitable
import aiohttp
import yaml
logger = logging.getLogger(__name__)
NINA_API_BASE = "https://warnung.bund.de/api31"
NINA_CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "nina.yaml")
SEVERITY_ORDER = {
"Unknown": -1,
"Minor": 0,
"Moderate": 1,
"Severe": 2,
"Extreme": 3,
}
SEVERITY_LABELS = {
"Extreme": "EXTREM",
"Severe": "Schwerwiegend",
"Moderate": "Maessig",
"Minor": "Gering",
}
# Dashboard-Endpunkt: ID-Präfixe je Quelle (für Quellen-Filterung)
SOURCE_PREFIXES = {
"katwarn": ("katwarn.",),
"biwapp": ("biwapp.",),
"mowas": ("mowas.", "mow."),
"dwd": ("dwd.",),
"lhp": ("lhp.",),
"police": ("police.",),
}
# mapData-Endpunkte je Quelle
SOURCE_MAP_ENDPOINTS = {
"katwarn": "katwarn",
"biwapp": "biwapp",
"mowas": "mowas",
"dwd": "dwd",
"lhp": "lhp",
"police": "police",
}
# Normalisierung für quellenübergreifende De-Duplikation (mapData-Präfix → Dashboard-Präfix)
ID_NORMALIZATIONS = [
("dwdmap.", "dwd."),
("lhpmap.", "lhp."),
("polmap.", "police."),
("mow.", "mowas."),
]
# Lesbarer Name je sächsischem AGS-Code (12-stellig)
AGS_LABELS: dict[str, str] = {
"145110000000": "Chemnitz, Stadt",
"145210000000": "Erzgebirgskreis",
"145220000000": "Mittelsachsen",
"145230000000": "Vogtlandkreis",
"145240000000": "Zwickau",
"146120000000": "Dresden, Stadt",
"146250000000": "Bautzen",
"146260000000": "Görlitz",
"146270000000": "Meißen",
"146280000000": "Sächsische Schweiz-Osterzgebirge",
"147130000000": "Leipzig, Stadt",
"147290000000": "Landkreis Leipzig",
"147300000000": "Nordsachsen",
}
DEFAULT_CONFIG = {
"enabled": False,
"send_to_mesh": True,
"poll_interval": 300,
"resend_interval": 3600,
"channel": 0,
"min_severity": "Severe",
"ags_codes": [],
"sources": {
"katwarn": True,
"biwapp": True,
"mowas": True,
"dwd": True,
"lhp": True,
"police": False,
},
}
class NinaBot:
"""Polls the NINA BBK warning API and forwards alerts to Meshtastic.
Two polling strategies run in parallel per cycle:
1. Dashboard regional, per AGS code (all sources, geographic filter by BBK server)
2. mapData national, per source (severity-only filter, covers gaps in dashboard)
Cross-strategy de-duplication is achieved by normalising warning IDs before
storing them in _known (e.g. 'dwdmap.''dwd.').
"""
def __init__(self, send_callback: Callable[[str, int], Awaitable[None]], ws_manager=None):
self.send_callback = send_callback
self.ws_manager = ws_manager
self.config: dict = {}
self._mtime: float = 0.0
self._known: dict[str, str] = {} # normalised_id -> sent (de-dup)
self._active: dict[str, dict] = {} # normalised_id -> {text, channel, headline, severity, id, sent}
self._running = False
self._task: asyncio.Task | None = None
self._resend_task: asyncio.Task | None = None
self._last_poll: str = ""
self._load()
# ── Config ──────────────────────────────────────────────────────────────
def _load(self):
try:
with open(NINA_CONFIG_PATH) as f:
data = yaml.safe_load(f) or {}
self.config = {**DEFAULT_CONFIG, **data}
if "sources" in data:
self.config["sources"] = {**DEFAULT_CONFIG["sources"], **data["sources"]}
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
logger.info(
"NINA config loaded (enabled=%s, codes=%s)",
self.config.get("enabled"),
self.config.get("ags_codes"),
)
except FileNotFoundError:
logger.info("No nina.yaml found using defaults")
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
self._save()
except Exception:
logger.exception("Error loading nina.yaml")
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
def _save(self):
try:
with open(NINA_CONFIG_PATH, "w") as f:
yaml.dump(
self.config,
f,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
)
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
logger.info("NINA config saved")
except Exception:
logger.exception("Error saving nina.yaml")
def get_config(self) -> dict:
return {**self.config, "last_poll": self._last_poll}
def get_active_alerts(self) -> list[dict]:
return sorted(self._active.values(), key=lambda x: x.get("sent", ""), reverse=True)
def update_config(self, updates: dict) -> dict:
if "sources" in updates:
self.config["sources"] = {
**self.config.get("sources", {}),
**updates.pop("sources"),
}
self.config.update(updates)
self._save()
return self.config
# ── Lifecycle ────────────────────────────────────────────────────────────
async def start(self):
self._running = True
self._task = asyncio.create_task(self._poll_loop())
self._resend_task = asyncio.create_task(self._resend_loop())
logger.info("NinaBot started")
async def stop(self):
self._running = False
for task in (self._task, self._resend_task):
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# ── Hot-reload ───────────────────────────────────────────────────────────
async def watch(self, interval: float = 5.0):
"""Reload nina.yaml when the file changes on disk."""
while True:
await asyncio.sleep(interval)
try:
current_mtime = os.path.getmtime(NINA_CONFIG_PATH)
if current_mtime != self._mtime:
self._load()
except FileNotFoundError:
pass
except Exception:
logger.exception("Error watching nina.yaml")
# ── Polling ──────────────────────────────────────────────────────────────
async def trigger_poll(self) -> None:
"""Run _check_alerts immediately (e.g. triggered after config save)."""
if not self.config.get("enabled"):
return
try:
await self._check_alerts()
self._last_poll = datetime.now(timezone.utc).isoformat()
except Exception:
logger.exception("NINA triggered poll error")
async def _poll_loop(self):
while self._running:
try:
if self.config.get("enabled"):
await self._check_alerts()
self._last_poll = datetime.now(timezone.utc).isoformat()
except Exception:
logger.exception("NINA polling error")
interval = max(60, int(self.config.get("poll_interval", 300)))
await asyncio.sleep(interval)
async def _resend_loop(self):
"""Re-broadcast all active warnings at resend_interval when send_to_mesh is enabled."""
while self._running:
interval = max(60, int(self.config.get("resend_interval", 3600)))
await asyncio.sleep(interval)
try:
if self.config.get("enabled") and self.config.get("send_to_mesh", True):
if self._active:
logger.info("NINA resend: %d aktive Warnmeldungen", len(self._active))
for entry in list(self._active.values()):
await self.send_callback(entry["text"], entry["channel"])
except Exception:
logger.exception("NINA resend error")
async def _check_alerts(self):
min_level = SEVERITY_ORDER.get(self.config.get("min_severity", "Severe"), 2)
channel = int(self.config.get("channel", 0))
sources = self.config.get("sources", DEFAULT_CONFIG["sources"])
ags_codes = self.config.get("ags_codes", [])
async with aiohttp.ClientSession(
headers={"User-Agent": "MeshDD-Bot/1.0 (+https://github.com/ppfeiffer/MeshDD-Bot)"},
timeout=aiohttp.ClientTimeout(total=30),
) as session:
# 1. Dashboard: regional filtering per AGS code (server-side, all sources)
for ags in ags_codes:
try:
await self._fetch_dashboard(session, str(ags).strip(), min_level, channel, sources)
except Exception:
logger.exception("NINA dashboard error for AGS %s", ags)
# 2. mapData: national per-source polling nur wenn KEINE AGS-Codes konfiguriert
# sind, da mapData keine geografische Filterung unterstützt und sonst
# bundesweite Meldungen außerhalb der konfigurierten Regionen erscheinen.
# Mit AGS-Codes deckt das Dashboard bereits alle Quellen regional ab.
if not ags_codes:
for source_key, endpoint in SOURCE_MAP_ENDPOINTS.items():
if not sources.get(source_key, True):
continue
try:
await self._fetch_map_data(session, source_key, endpoint, min_level, channel)
except Exception:
logger.exception("NINA mapData error for source %s", source_key)
# ── De-duplication helper ────────────────────────────────────────────────
@staticmethod
def _normalise_id(identifier: str) -> str:
"""Normalise a warning ID to a canonical form for cross-source de-duplication."""
for old, new in ID_NORMALIZATIONS:
if identifier.startswith(old):
return new + identifier[len(old):]
return identifier
@staticmethod
def _source_key_for(identifier: str) -> str | None:
"""Return the source config key for a given warning ID, or None if unknown."""
for key, prefixes in SOURCE_PREFIXES.items():
for p in prefixes:
if identifier.startswith(p):
return key
return None
# ── Dashboard endpoint ───────────────────────────────────────────────────
async def _fetch_dashboard(
self,
session: aiohttp.ClientSession,
ags: str,
min_level: int,
channel: int,
sources: dict,
):
ars = ags.ljust(12, "0")
url = f"{NINA_API_BASE}/dashboard/{ars}.json"
async with session.get(url) as resp:
if resp.status == 404:
logger.warning("NINA dashboard: no data for AGS %s (404)", ags)
return
if resp.status != 200:
logger.warning("NINA dashboard: status %d for %s", resp.status, url)
return
items = await resp.json(content_type=None)
if not isinstance(items, list):
logger.warning("NINA dashboard: unexpected response type for AGS %s", ags)
return
for item in items:
try:
await self._process_dashboard_item(item, min_level, channel, sources, ags)
except Exception:
logger.exception("NINA dashboard: error processing %s", item.get("id"))
async def _process_dashboard_item(
self, item: dict, min_level: int, channel: int, sources: dict, ags: str = ""
):
identifier = item.get("id", "")
if not identifier:
return
# Filter by source
src_key = self._source_key_for(identifier)
if src_key and not sources.get(src_key, True):
return
payload = item.get("payload", {})
sent = payload.get("sent", item.get("sent", ""))
data = payload.get("data", {})
msg_type = payload.get("msgType", data.get("msgType", "Alert"))
severity = data.get("severity", "Unknown")
sev_level = SEVERITY_ORDER.get(severity, -1)
if sev_level < min_level and msg_type != "Cancel":
return
dedup_key = self._normalise_id(identifier)
if dedup_key in self._known and self._known[dedup_key] == sent:
return
self._known[dedup_key] = sent
headline = data.get("headline", "Warnung")
description = data.get("description", "")
area = AGS_LABELS.get(ags.ljust(12, "0"), ags)
text = self._format_alert(msg_type, severity, headline, description, area)
logger.info("NINA dashboard alert: %s (id=%s, area=%s)", headline, identifier, area)
await self._send(identifier, severity, msg_type, headline, sent, text, channel, area)
# ── mapData endpoint ─────────────────────────────────────────────────────
async def _fetch_map_data(
self,
session: aiohttp.ClientSession,
source_key: str,
endpoint: str,
min_level: int,
channel: int,
):
url = f"{NINA_API_BASE}/{endpoint}/mapData.json"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning("NINA mapData %s: status %d", source_key, resp.status)
return
items = await resp.json(content_type=None)
if not isinstance(items, list):
logger.warning("NINA mapData %s: unexpected response type", source_key)
return
for item in items:
try:
await self._process_map_item(item, min_level, channel)
except Exception:
logger.exception("NINA mapData: error processing %s", item.get("id"))
async def _process_map_item(self, item: dict, min_level: int, channel: int):
identifier = item.get("id", "")
if not identifier:
return
severity = item.get("severity", "Unknown")
msg_type = item.get("type", "Alert")
sent = item.get("startDate", item.get("sent", ""))
sev_level = SEVERITY_ORDER.get(severity, -1)
if sev_level < min_level and msg_type != "Cancel":
return
dedup_key = self._normalise_id(identifier)
if dedup_key in self._known and self._known[dedup_key] == sent:
return
self._known[dedup_key] = sent
# Headline aus i18nTitle (Deutsch bevorzugt)
i18n = item.get("i18nTitle", {})
headline = i18n.get("de") or i18n.get("en") or identifier
text = self._format_alert(msg_type, severity, headline, "")
logger.info("NINA mapData alert: %s (id=%s)", headline, identifier)
await self._send(identifier, severity, msg_type, headline, sent, text, channel, "")
# ── Shared helpers ───────────────────────────────────────────────────────
@staticmethod
def _format_alert(msg_type: str, severity: str, headline: str, description: str, area: str = "") -> str:
area_suffix = f" ({area})" if area else ""
if msg_type == "Cancel":
return f"[NINA] Aufgehoben: {headline}{area_suffix}"
sev_text = SEVERITY_LABELS.get(severity, severity)
text = f"[NINA] {sev_text}: {headline}{area_suffix}"
if description:
short = description.strip()[:120]
if len(description.strip()) > 120:
short += "..."
text += f"\n{short}"
return text
async def _send(
self,
identifier: str,
severity: str,
msg_type: str,
headline: str,
sent: str,
text: str,
channel: int,
area: str = "",
):
dedup_key = self._normalise_id(identifier)
# Keep _active up to date for re-broadcast
if msg_type == "Cancel":
self._active.pop(dedup_key, None)
else:
self._active[dedup_key] = {
"text": text,
"channel": channel,
"headline": headline,
"severity": severity,
"msgType": msg_type,
"id": identifier,
"sent": sent,
"area": area,
"monitor_only": not self.config.get("send_to_mesh", True),
}
if self.config.get("send_to_mesh", True):
await self.send_callback(text, channel)
else:
logger.info("NINA monitor-only: Mesh-Versand deaktiviert, nur WebSocket-Broadcast")
if self.ws_manager:
await self.ws_manager.broadcast("nina_alert", {
"id": identifier,
"severity": severity,
"msgType": msg_type,
"headline": headline,
"sent": sent,
"area": area,
"monitor_only": not self.config.get("send_to_mesh", True),
})