MeshDD-Bot/meshbot/nina.py
ppfeiffer e788fc7201 fix: NINA mapData nur ohne AGS-Codes abfragen (v0.8.2)
mapData liefert bundesweite Meldungen ohne geografische Filterung.
Mit konfigurierten AGS-Codes deckt der Dashboard-Endpunkt bereits
alle Quellen regional ab – mapData wird dann nicht mehr benötigt
und würde Meldungen außerhalb der konfigurierten Regionen zeigen.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 13:27:55 +01:00

432 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import os
from typing import Callable, Awaitable
import aiohttp
import yaml
logger = logging.getLogger(__name__)
NINA_API_BASE = "https://warnung.bund.de/api31"
NINA_CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "nina.yaml")
SEVERITY_ORDER = {
"Unknown": -1,
"Minor": 0,
"Moderate": 1,
"Severe": 2,
"Extreme": 3,
}
SEVERITY_LABELS = {
"Extreme": "EXTREM",
"Severe": "Schwerwiegend",
"Moderate": "Maessig",
"Minor": "Gering",
}
# Dashboard-Endpunkt: ID-Präfixe je Quelle (für Quellen-Filterung)
SOURCE_PREFIXES = {
"katwarn": ("katwarn.",),
"biwapp": ("biwapp.",),
"mowas": ("mowas.", "mow."),
"dwd": ("dwd.",),
"lhp": ("lhp.",),
"police": ("police.",),
}
# mapData-Endpunkte je Quelle
SOURCE_MAP_ENDPOINTS = {
"katwarn": "katwarn",
"biwapp": "biwapp",
"mowas": "mowas",
"dwd": "dwd",
"lhp": "lhp",
"police": "police",
}
# Normalisierung für quellenübergreifende De-Duplikation (mapData-Präfix → Dashboard-Präfix)
ID_NORMALIZATIONS = [
("dwdmap.", "dwd."),
("lhpmap.", "lhp."),
("polmap.", "police."),
("mow.", "mowas."),
]
DEFAULT_CONFIG = {
"enabled": False,
"send_to_mesh": True,
"poll_interval": 300,
"resend_interval": 3600,
"channel": 0,
"min_severity": "Severe",
"ags_codes": [],
"sources": {
"katwarn": True,
"biwapp": True,
"mowas": True,
"dwd": True,
"lhp": True,
"police": False,
},
}
class NinaBot:
"""Polls the NINA BBK warning API and forwards alerts to Meshtastic.
Two polling strategies run in parallel per cycle:
1. Dashboard regional, per AGS code (all sources, geographic filter by BBK server)
2. mapData national, per source (severity-only filter, covers gaps in dashboard)
Cross-strategy de-duplication is achieved by normalising warning IDs before
storing them in _known (e.g. 'dwdmap.''dwd.').
"""
def __init__(self, send_callback: Callable[[str, int], Awaitable[None]], ws_manager=None):
self.send_callback = send_callback
self.ws_manager = ws_manager
self.config: dict = {}
self._mtime: float = 0.0
self._known: dict[str, str] = {} # normalised_id -> sent (de-dup)
self._active: dict[str, dict] = {} # normalised_id -> {text, channel, headline, severity, id, sent}
self._running = False
self._task: asyncio.Task | None = None
self._resend_task: asyncio.Task | None = None
self._load()
# ── Config ──────────────────────────────────────────────────────────────
def _load(self):
try:
with open(NINA_CONFIG_PATH) as f:
data = yaml.safe_load(f) or {}
self.config = {**DEFAULT_CONFIG, **data}
if "sources" in data:
self.config["sources"] = {**DEFAULT_CONFIG["sources"], **data["sources"]}
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
logger.info(
"NINA config loaded (enabled=%s, codes=%s)",
self.config.get("enabled"),
self.config.get("ags_codes"),
)
except FileNotFoundError:
logger.info("No nina.yaml found using defaults")
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
self._save()
except Exception:
logger.exception("Error loading nina.yaml")
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
def _save(self):
try:
with open(NINA_CONFIG_PATH, "w") as f:
yaml.dump(
self.config,
f,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
)
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
logger.info("NINA config saved")
except Exception:
logger.exception("Error saving nina.yaml")
def get_config(self) -> dict:
return self.config
def update_config(self, updates: dict) -> dict:
if "sources" in updates:
self.config["sources"] = {
**self.config.get("sources", {}),
**updates.pop("sources"),
}
self.config.update(updates)
self._save()
return self.config
# ── Lifecycle ────────────────────────────────────────────────────────────
async def start(self):
self._running = True
self._task = asyncio.create_task(self._poll_loop())
self._resend_task = asyncio.create_task(self._resend_loop())
logger.info("NinaBot started")
async def stop(self):
self._running = False
for task in (self._task, self._resend_task):
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# ── Hot-reload ───────────────────────────────────────────────────────────
async def watch(self, interval: float = 5.0):
"""Reload nina.yaml when the file changes on disk."""
while True:
await asyncio.sleep(interval)
try:
current_mtime = os.path.getmtime(NINA_CONFIG_PATH)
if current_mtime != self._mtime:
self._load()
except FileNotFoundError:
pass
except Exception:
logger.exception("Error watching nina.yaml")
# ── Polling ──────────────────────────────────────────────────────────────
async def _poll_loop(self):
while self._running:
try:
if self.config.get("enabled"):
await self._check_alerts()
except Exception:
logger.exception("NINA polling error")
interval = max(60, int(self.config.get("poll_interval", 300)))
await asyncio.sleep(interval)
async def _resend_loop(self):
"""Re-broadcast all active warnings at resend_interval when send_to_mesh is enabled."""
while self._running:
interval = max(60, int(self.config.get("resend_interval", 3600)))
await asyncio.sleep(interval)
try:
if self.config.get("enabled") and self.config.get("send_to_mesh", True):
if self._active:
logger.info("NINA resend: %d aktive Warnmeldungen", len(self._active))
for entry in list(self._active.values()):
await self.send_callback(entry["text"], entry["channel"])
except Exception:
logger.exception("NINA resend error")
async def _check_alerts(self):
min_level = SEVERITY_ORDER.get(self.config.get("min_severity", "Severe"), 2)
channel = int(self.config.get("channel", 0))
sources = self.config.get("sources", DEFAULT_CONFIG["sources"])
ags_codes = self.config.get("ags_codes", [])
async with aiohttp.ClientSession(
headers={"User-Agent": "MeshDD-Bot/1.0 (+https://github.com/ppfeiffer/MeshDD-Bot)"},
timeout=aiohttp.ClientTimeout(total=30),
) as session:
# 1. Dashboard: regional filtering per AGS code (server-side, all sources)
for ags in ags_codes:
try:
await self._fetch_dashboard(session, str(ags).strip(), min_level, channel, sources)
except Exception:
logger.exception("NINA dashboard error for AGS %s", ags)
# 2. mapData: national per-source polling nur wenn KEINE AGS-Codes konfiguriert
# sind, da mapData keine geografische Filterung unterstützt und sonst
# bundesweite Meldungen außerhalb der konfigurierten Regionen erscheinen.
# Mit AGS-Codes deckt das Dashboard bereits alle Quellen regional ab.
if not ags_codes:
for source_key, endpoint in SOURCE_MAP_ENDPOINTS.items():
if not sources.get(source_key, True):
continue
try:
await self._fetch_map_data(session, source_key, endpoint, min_level, channel)
except Exception:
logger.exception("NINA mapData error for source %s", source_key)
# ── De-duplication helper ────────────────────────────────────────────────
@staticmethod
def _normalise_id(identifier: str) -> str:
"""Normalise a warning ID to a canonical form for cross-source de-duplication."""
for old, new in ID_NORMALIZATIONS:
if identifier.startswith(old):
return new + identifier[len(old):]
return identifier
@staticmethod
def _source_key_for(identifier: str) -> str | None:
"""Return the source config key for a given warning ID, or None if unknown."""
for key, prefixes in SOURCE_PREFIXES.items():
for p in prefixes:
if identifier.startswith(p):
return key
return None
# ── Dashboard endpoint ───────────────────────────────────────────────────
async def _fetch_dashboard(
self,
session: aiohttp.ClientSession,
ags: str,
min_level: int,
channel: int,
sources: dict,
):
ars = ags.ljust(12, "0")
url = f"{NINA_API_BASE}/dashboard/{ars}.json"
async with session.get(url) as resp:
if resp.status == 404:
logger.warning("NINA dashboard: no data for AGS %s (404)", ags)
return
if resp.status != 200:
logger.warning("NINA dashboard: status %d for %s", resp.status, url)
return
items = await resp.json(content_type=None)
if not isinstance(items, list):
logger.warning("NINA dashboard: unexpected response type for AGS %s", ags)
return
for item in items:
try:
await self._process_dashboard_item(item, min_level, channel, sources)
except Exception:
logger.exception("NINA dashboard: error processing %s", item.get("id"))
async def _process_dashboard_item(
self, item: dict, min_level: int, channel: int, sources: dict
):
identifier = item.get("id", "")
if not identifier:
return
# Filter by source
src_key = self._source_key_for(identifier)
if src_key and not sources.get(src_key, True):
return
payload = item.get("payload", {})
sent = payload.get("sent", item.get("sent", ""))
data = payload.get("data", {})
msg_type = payload.get("msgType", data.get("msgType", "Alert"))
severity = data.get("severity", "Unknown")
sev_level = SEVERITY_ORDER.get(severity, -1)
if sev_level < min_level and msg_type != "Cancel":
return
dedup_key = self._normalise_id(identifier)
if dedup_key in self._known and self._known[dedup_key] == sent:
return
self._known[dedup_key] = sent
headline = data.get("headline", "Warnung")
description = data.get("description", "")
text = self._format_alert(msg_type, severity, headline, description)
logger.info("NINA dashboard alert: %s (id=%s)", headline, identifier)
await self._send(identifier, severity, msg_type, headline, sent, text, channel)
# ── mapData endpoint ─────────────────────────────────────────────────────
async def _fetch_map_data(
self,
session: aiohttp.ClientSession,
source_key: str,
endpoint: str,
min_level: int,
channel: int,
):
url = f"{NINA_API_BASE}/{endpoint}/mapData.json"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning("NINA mapData %s: status %d", source_key, resp.status)
return
items = await resp.json(content_type=None)
if not isinstance(items, list):
logger.warning("NINA mapData %s: unexpected response type", source_key)
return
for item in items:
try:
await self._process_map_item(item, min_level, channel)
except Exception:
logger.exception("NINA mapData: error processing %s", item.get("id"))
async def _process_map_item(self, item: dict, min_level: int, channel: int):
identifier = item.get("id", "")
if not identifier:
return
severity = item.get("severity", "Unknown")
msg_type = item.get("type", "Alert")
sent = item.get("startDate", item.get("sent", ""))
sev_level = SEVERITY_ORDER.get(severity, -1)
if sev_level < min_level and msg_type != "Cancel":
return
dedup_key = self._normalise_id(identifier)
if dedup_key in self._known and self._known[dedup_key] == sent:
return
self._known[dedup_key] = sent
# Headline aus i18nTitle (Deutsch bevorzugt)
i18n = item.get("i18nTitle", {})
headline = i18n.get("de") or i18n.get("en") or identifier
text = self._format_alert(msg_type, severity, headline, "")
logger.info("NINA mapData alert: %s (id=%s)", headline, identifier)
await self._send(identifier, severity, msg_type, headline, sent, text, channel)
# ── Shared helpers ───────────────────────────────────────────────────────
@staticmethod
def _format_alert(msg_type: str, severity: str, headline: str, description: str) -> str:
if msg_type == "Cancel":
return f"[NINA] Aufgehoben: {headline}"
sev_text = SEVERITY_LABELS.get(severity, severity)
text = f"[NINA] {sev_text}: {headline}"
if description:
short = description.strip()[:120]
if len(description.strip()) > 120:
short += "..."
text += f"\n{short}"
return text
async def _send(
self,
identifier: str,
severity: str,
msg_type: str,
headline: str,
sent: str,
text: str,
channel: int,
):
dedup_key = self._normalise_id(identifier)
# Keep _active up to date for re-broadcast
if msg_type == "Cancel":
self._active.pop(dedup_key, None)
else:
self._active[dedup_key] = {
"text": text,
"channel": channel,
"headline": headline,
"severity": severity,
"id": identifier,
"sent": sent,
}
if self.config.get("send_to_mesh", True):
await self.send_callback(text, channel)
else:
logger.info("NINA monitor-only: Mesh-Versand deaktiviert, nur WebSocket-Broadcast")
if self.ws_manager:
await self.ws_manager.broadcast("nina_alert", {
"id": identifier,
"severity": severity,
"msgType": msg_type,
"headline": headline,
"sent": sent,
"monitor_only": not self.config.get("send_to_mesh", True),
})