_active speichert jetzt msgType/area/monitor_only. get_active_alerts() gibt sortierte Liste zurück. nina.js lädt beim Init und dedupliziert per ID. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
472 lines
18 KiB
Python
472 lines
18 KiB
Python
import asyncio
|
||
import logging
|
||
import os
|
||
from datetime import datetime, timezone
|
||
from typing import Callable, Awaitable
|
||
|
||
import aiohttp
|
||
import yaml
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
NINA_API_BASE = "https://warnung.bund.de/api31"
|
||
NINA_CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "nina.yaml")
|
||
|
||
SEVERITY_ORDER = {
|
||
"Unknown": -1,
|
||
"Minor": 0,
|
||
"Moderate": 1,
|
||
"Severe": 2,
|
||
"Extreme": 3,
|
||
}
|
||
|
||
SEVERITY_LABELS = {
|
||
"Extreme": "EXTREM",
|
||
"Severe": "Schwerwiegend",
|
||
"Moderate": "Maessig",
|
||
"Minor": "Gering",
|
||
}
|
||
|
||
# Dashboard-Endpunkt: ID-Präfixe je Quelle (für Quellen-Filterung)
|
||
SOURCE_PREFIXES = {
|
||
"katwarn": ("katwarn.",),
|
||
"biwapp": ("biwapp.",),
|
||
"mowas": ("mowas.", "mow."),
|
||
"dwd": ("dwd.",),
|
||
"lhp": ("lhp.",),
|
||
"police": ("police.",),
|
||
}
|
||
|
||
# mapData-Endpunkte je Quelle
|
||
SOURCE_MAP_ENDPOINTS = {
|
||
"katwarn": "katwarn",
|
||
"biwapp": "biwapp",
|
||
"mowas": "mowas",
|
||
"dwd": "dwd",
|
||
"lhp": "lhp",
|
||
"police": "police",
|
||
}
|
||
|
||
# Normalisierung für quellenübergreifende De-Duplikation (mapData-Präfix → Dashboard-Präfix)
|
||
ID_NORMALIZATIONS = [
|
||
("dwdmap.", "dwd."),
|
||
("lhpmap.", "lhp."),
|
||
("polmap.", "police."),
|
||
("mow.", "mowas."),
|
||
]
|
||
|
||
# Lesbarer Name je sächsischem AGS-Code (12-stellig)
|
||
AGS_LABELS: dict[str, str] = {
|
||
"145110000000": "Chemnitz, Stadt",
|
||
"145210000000": "Erzgebirgskreis",
|
||
"145220000000": "Mittelsachsen",
|
||
"145230000000": "Vogtlandkreis",
|
||
"145240000000": "Zwickau",
|
||
"146120000000": "Dresden, Stadt",
|
||
"146250000000": "Bautzen",
|
||
"146260000000": "Görlitz",
|
||
"146270000000": "Meißen",
|
||
"146280000000": "Sächsische Schweiz-Osterzgebirge",
|
||
"147130000000": "Leipzig, Stadt",
|
||
"147290000000": "Landkreis Leipzig",
|
||
"147300000000": "Nordsachsen",
|
||
}
|
||
|
||
DEFAULT_CONFIG = {
|
||
"enabled": False,
|
||
"send_to_mesh": True,
|
||
"poll_interval": 300,
|
||
"resend_interval": 3600,
|
||
"channel": 0,
|
||
"min_severity": "Severe",
|
||
"ags_codes": [],
|
||
"sources": {
|
||
"katwarn": True,
|
||
"biwapp": True,
|
||
"mowas": True,
|
||
"dwd": True,
|
||
"lhp": True,
|
||
"police": False,
|
||
},
|
||
}
|
||
|
||
|
||
class NinaBot:
|
||
"""Polls the NINA BBK warning API and forwards alerts to Meshtastic.
|
||
|
||
Two polling strategies run in parallel per cycle:
|
||
1. Dashboard – regional, per AGS code (all sources, geographic filter by BBK server)
|
||
2. mapData – national, per source (severity-only filter, covers gaps in dashboard)
|
||
|
||
Cross-strategy de-duplication is achieved by normalising warning IDs before
|
||
storing them in _known (e.g. 'dwdmap.' → 'dwd.').
|
||
"""
|
||
|
||
def __init__(self, send_callback: Callable[[str, int], Awaitable[None]], ws_manager=None):
|
||
self.send_callback = send_callback
|
||
self.ws_manager = ws_manager
|
||
self.config: dict = {}
|
||
self._mtime: float = 0.0
|
||
self._known: dict[str, str] = {} # normalised_id -> sent (de-dup)
|
||
self._active: dict[str, dict] = {} # normalised_id -> {text, channel, headline, severity, id, sent}
|
||
self._running = False
|
||
self._task: asyncio.Task | None = None
|
||
self._resend_task: asyncio.Task | None = None
|
||
self._last_poll: str = ""
|
||
self._load()
|
||
|
||
# ── Config ──────────────────────────────────────────────────────────────
|
||
|
||
def _load(self):
|
||
try:
|
||
with open(NINA_CONFIG_PATH) as f:
|
||
data = yaml.safe_load(f) or {}
|
||
self.config = {**DEFAULT_CONFIG, **data}
|
||
if "sources" in data:
|
||
self.config["sources"] = {**DEFAULT_CONFIG["sources"], **data["sources"]}
|
||
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
|
||
logger.info(
|
||
"NINA config loaded (enabled=%s, codes=%s)",
|
||
self.config.get("enabled"),
|
||
self.config.get("ags_codes"),
|
||
)
|
||
except FileNotFoundError:
|
||
logger.info("No nina.yaml found – using defaults")
|
||
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
|
||
self._save()
|
||
except Exception:
|
||
logger.exception("Error loading nina.yaml")
|
||
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
|
||
|
||
def _save(self):
|
||
try:
|
||
with open(NINA_CONFIG_PATH, "w") as f:
|
||
yaml.dump(
|
||
self.config,
|
||
f,
|
||
default_flow_style=False,
|
||
allow_unicode=True,
|
||
sort_keys=False,
|
||
)
|
||
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
|
||
logger.info("NINA config saved")
|
||
except Exception:
|
||
logger.exception("Error saving nina.yaml")
|
||
|
||
def get_config(self) -> dict:
|
||
return {**self.config, "last_poll": self._last_poll}
|
||
|
||
def get_active_alerts(self) -> list[dict]:
|
||
return sorted(self._active.values(), key=lambda x: x.get("sent", ""), reverse=True)
|
||
|
||
def update_config(self, updates: dict) -> dict:
|
||
if "sources" in updates:
|
||
self.config["sources"] = {
|
||
**self.config.get("sources", {}),
|
||
**updates.pop("sources"),
|
||
}
|
||
self.config.update(updates)
|
||
self._save()
|
||
return self.config
|
||
|
||
# ── Lifecycle ────────────────────────────────────────────────────────────
|
||
|
||
async def start(self):
|
||
self._running = True
|
||
self._task = asyncio.create_task(self._poll_loop())
|
||
self._resend_task = asyncio.create_task(self._resend_loop())
|
||
logger.info("NinaBot started")
|
||
|
||
async def stop(self):
|
||
self._running = False
|
||
for task in (self._task, self._resend_task):
|
||
if task:
|
||
task.cancel()
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# ── Hot-reload ───────────────────────────────────────────────────────────
|
||
|
||
async def watch(self, interval: float = 5.0):
|
||
"""Reload nina.yaml when the file changes on disk."""
|
||
while True:
|
||
await asyncio.sleep(interval)
|
||
try:
|
||
current_mtime = os.path.getmtime(NINA_CONFIG_PATH)
|
||
if current_mtime != self._mtime:
|
||
self._load()
|
||
except FileNotFoundError:
|
||
pass
|
||
except Exception:
|
||
logger.exception("Error watching nina.yaml")
|
||
|
||
# ── Polling ──────────────────────────────────────────────────────────────
|
||
|
||
async def trigger_poll(self) -> None:
|
||
"""Run _check_alerts immediately (e.g. triggered after config save)."""
|
||
if not self.config.get("enabled"):
|
||
return
|
||
try:
|
||
await self._check_alerts()
|
||
self._last_poll = datetime.now(timezone.utc).isoformat()
|
||
except Exception:
|
||
logger.exception("NINA triggered poll error")
|
||
|
||
async def _poll_loop(self):
|
||
while self._running:
|
||
try:
|
||
if self.config.get("enabled"):
|
||
await self._check_alerts()
|
||
self._last_poll = datetime.now(timezone.utc).isoformat()
|
||
except Exception:
|
||
logger.exception("NINA polling error")
|
||
interval = max(60, int(self.config.get("poll_interval", 300)))
|
||
await asyncio.sleep(interval)
|
||
|
||
async def _resend_loop(self):
|
||
"""Re-broadcast all active warnings at resend_interval when send_to_mesh is enabled."""
|
||
while self._running:
|
||
interval = max(60, int(self.config.get("resend_interval", 3600)))
|
||
await asyncio.sleep(interval)
|
||
try:
|
||
if self.config.get("enabled") and self.config.get("send_to_mesh", True):
|
||
if self._active:
|
||
logger.info("NINA resend: %d aktive Warnmeldungen", len(self._active))
|
||
for entry in list(self._active.values()):
|
||
await self.send_callback(entry["text"], entry["channel"])
|
||
except Exception:
|
||
logger.exception("NINA resend error")
|
||
|
||
async def _check_alerts(self):
|
||
min_level = SEVERITY_ORDER.get(self.config.get("min_severity", "Severe"), 2)
|
||
channel = int(self.config.get("channel", 0))
|
||
sources = self.config.get("sources", DEFAULT_CONFIG["sources"])
|
||
ags_codes = self.config.get("ags_codes", [])
|
||
|
||
async with aiohttp.ClientSession(
|
||
headers={"User-Agent": "MeshDD-Bot/1.0 (+https://github.com/ppfeiffer/MeshDD-Bot)"},
|
||
timeout=aiohttp.ClientTimeout(total=30),
|
||
) as session:
|
||
# 1. Dashboard: regional filtering per AGS code (server-side, all sources)
|
||
for ags in ags_codes:
|
||
try:
|
||
await self._fetch_dashboard(session, str(ags).strip(), min_level, channel, sources)
|
||
except Exception:
|
||
logger.exception("NINA dashboard error for AGS %s", ags)
|
||
|
||
# 2. mapData: national per-source polling – nur wenn KEINE AGS-Codes konfiguriert
|
||
# sind, da mapData keine geografische Filterung unterstützt und sonst
|
||
# bundesweite Meldungen außerhalb der konfigurierten Regionen erscheinen.
|
||
# Mit AGS-Codes deckt das Dashboard bereits alle Quellen regional ab.
|
||
if not ags_codes:
|
||
for source_key, endpoint in SOURCE_MAP_ENDPOINTS.items():
|
||
if not sources.get(source_key, True):
|
||
continue
|
||
try:
|
||
await self._fetch_map_data(session, source_key, endpoint, min_level, channel)
|
||
except Exception:
|
||
logger.exception("NINA mapData error for source %s", source_key)
|
||
|
||
# ── De-duplication helper ────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _normalise_id(identifier: str) -> str:
|
||
"""Normalise a warning ID to a canonical form for cross-source de-duplication."""
|
||
for old, new in ID_NORMALIZATIONS:
|
||
if identifier.startswith(old):
|
||
return new + identifier[len(old):]
|
||
return identifier
|
||
|
||
@staticmethod
|
||
def _source_key_for(identifier: str) -> str | None:
|
||
"""Return the source config key for a given warning ID, or None if unknown."""
|
||
for key, prefixes in SOURCE_PREFIXES.items():
|
||
for p in prefixes:
|
||
if identifier.startswith(p):
|
||
return key
|
||
return None
|
||
|
||
# ── Dashboard endpoint ───────────────────────────────────────────────────
|
||
|
||
async def _fetch_dashboard(
|
||
self,
|
||
session: aiohttp.ClientSession,
|
||
ags: str,
|
||
min_level: int,
|
||
channel: int,
|
||
sources: dict,
|
||
):
|
||
ars = ags.ljust(12, "0")
|
||
url = f"{NINA_API_BASE}/dashboard/{ars}.json"
|
||
|
||
async with session.get(url) as resp:
|
||
if resp.status == 404:
|
||
logger.warning("NINA dashboard: no data for AGS %s (404)", ags)
|
||
return
|
||
if resp.status != 200:
|
||
logger.warning("NINA dashboard: status %d for %s", resp.status, url)
|
||
return
|
||
items = await resp.json(content_type=None)
|
||
|
||
if not isinstance(items, list):
|
||
logger.warning("NINA dashboard: unexpected response type for AGS %s", ags)
|
||
return
|
||
|
||
for item in items:
|
||
try:
|
||
await self._process_dashboard_item(item, min_level, channel, sources, ags)
|
||
except Exception:
|
||
logger.exception("NINA dashboard: error processing %s", item.get("id"))
|
||
|
||
async def _process_dashboard_item(
|
||
self, item: dict, min_level: int, channel: int, sources: dict, ags: str = ""
|
||
):
|
||
identifier = item.get("id", "")
|
||
if not identifier:
|
||
return
|
||
|
||
# Filter by source
|
||
src_key = self._source_key_for(identifier)
|
||
if src_key and not sources.get(src_key, True):
|
||
return
|
||
|
||
payload = item.get("payload", {})
|
||
sent = payload.get("sent", item.get("sent", ""))
|
||
data = payload.get("data", {})
|
||
msg_type = payload.get("msgType", data.get("msgType", "Alert"))
|
||
severity = data.get("severity", "Unknown")
|
||
|
||
sev_level = SEVERITY_ORDER.get(severity, -1)
|
||
if sev_level < min_level and msg_type != "Cancel":
|
||
return
|
||
|
||
dedup_key = self._normalise_id(identifier)
|
||
if dedup_key in self._known and self._known[dedup_key] == sent:
|
||
return
|
||
self._known[dedup_key] = sent
|
||
|
||
headline = data.get("headline", "Warnung")
|
||
description = data.get("description", "")
|
||
area = AGS_LABELS.get(ags.ljust(12, "0"), ags)
|
||
|
||
text = self._format_alert(msg_type, severity, headline, description, area)
|
||
logger.info("NINA dashboard alert: %s (id=%s, area=%s)", headline, identifier, area)
|
||
await self._send(identifier, severity, msg_type, headline, sent, text, channel, area)
|
||
|
||
# ── mapData endpoint ─────────────────────────────────────────────────────
|
||
|
||
async def _fetch_map_data(
|
||
self,
|
||
session: aiohttp.ClientSession,
|
||
source_key: str,
|
||
endpoint: str,
|
||
min_level: int,
|
||
channel: int,
|
||
):
|
||
url = f"{NINA_API_BASE}/{endpoint}/mapData.json"
|
||
|
||
async with session.get(url) as resp:
|
||
if resp.status != 200:
|
||
logger.warning("NINA mapData %s: status %d", source_key, resp.status)
|
||
return
|
||
items = await resp.json(content_type=None)
|
||
|
||
if not isinstance(items, list):
|
||
logger.warning("NINA mapData %s: unexpected response type", source_key)
|
||
return
|
||
|
||
for item in items:
|
||
try:
|
||
await self._process_map_item(item, min_level, channel)
|
||
except Exception:
|
||
logger.exception("NINA mapData: error processing %s", item.get("id"))
|
||
|
||
async def _process_map_item(self, item: dict, min_level: int, channel: int):
|
||
identifier = item.get("id", "")
|
||
if not identifier:
|
||
return
|
||
|
||
severity = item.get("severity", "Unknown")
|
||
msg_type = item.get("type", "Alert")
|
||
sent = item.get("startDate", item.get("sent", ""))
|
||
|
||
sev_level = SEVERITY_ORDER.get(severity, -1)
|
||
if sev_level < min_level and msg_type != "Cancel":
|
||
return
|
||
|
||
dedup_key = self._normalise_id(identifier)
|
||
if dedup_key in self._known and self._known[dedup_key] == sent:
|
||
return
|
||
self._known[dedup_key] = sent
|
||
|
||
# Headline aus i18nTitle (Deutsch bevorzugt)
|
||
i18n = item.get("i18nTitle", {})
|
||
headline = i18n.get("de") or i18n.get("en") or identifier
|
||
|
||
text = self._format_alert(msg_type, severity, headline, "")
|
||
logger.info("NINA mapData alert: %s (id=%s)", headline, identifier)
|
||
await self._send(identifier, severity, msg_type, headline, sent, text, channel, "")
|
||
|
||
# ── Shared helpers ───────────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _format_alert(msg_type: str, severity: str, headline: str, description: str, area: str = "") -> str:
|
||
area_suffix = f" ({area})" if area else ""
|
||
if msg_type == "Cancel":
|
||
return f"[NINA] Aufgehoben: {headline}{area_suffix}"
|
||
sev_text = SEVERITY_LABELS.get(severity, severity)
|
||
text = f"[NINA] {sev_text}: {headline}{area_suffix}"
|
||
if description:
|
||
short = description.strip()[:120]
|
||
if len(description.strip()) > 120:
|
||
short += "..."
|
||
text += f"\n{short}"
|
||
return text
|
||
|
||
async def _send(
|
||
self,
|
||
identifier: str,
|
||
severity: str,
|
||
msg_type: str,
|
||
headline: str,
|
||
sent: str,
|
||
text: str,
|
||
channel: int,
|
||
area: str = "",
|
||
):
|
||
dedup_key = self._normalise_id(identifier)
|
||
|
||
# Keep _active up to date for re-broadcast
|
||
if msg_type == "Cancel":
|
||
self._active.pop(dedup_key, None)
|
||
else:
|
||
self._active[dedup_key] = {
|
||
"text": text,
|
||
"channel": channel,
|
||
"headline": headline,
|
||
"severity": severity,
|
||
"msgType": msg_type,
|
||
"id": identifier,
|
||
"sent": sent,
|
||
"area": area,
|
||
"monitor_only": not self.config.get("send_to_mesh", True),
|
||
}
|
||
|
||
if self.config.get("send_to_mesh", True):
|
||
await self.send_callback(text, channel)
|
||
else:
|
||
logger.info("NINA monitor-only: Mesh-Versand deaktiviert, nur WebSocket-Broadcast")
|
||
|
||
if self.ws_manager:
|
||
await self.ws_manager.broadcast("nina_alert", {
|
||
"id": identifier,
|
||
"severity": severity,
|
||
"msgType": msg_type,
|
||
"headline": headline,
|
||
"sent": sent,
|
||
"area": area,
|
||
"monitor_only": not self.config.get("send_to_mesh", True),
|
||
})
|