MeshDD-Bot/meshbot/nina.py
ppfeiffer ee3208769c feat: NINA send_to_mesh-Schalter + Dresden-AGS-Codes
- send_to_mesh: true/false – trennt Abfrage vom Mesh-Versand.
  false = Monitor-Modus: Warnmeldungen werden abgerufen und in der
  Weboberfläche angezeigt, aber NICHT ins Meshtastic-Netz gesendet.
  WebSocket-Event enthaelt monitor_only-Flag (Anzeige per Icon).
- nina.yaml/conf/nina.yaml: send_to_mesh=false als sichere Voreinstellung
  + 5 AGS-Codes fuer den Raum Dresden vorbelegt:
  Stadt Dresden (146120000000), LK Meissen (146270000000),
  LK Saechs. Schweiz-Osterzgebirge (146280000000),
  LK Bautzen (146250000000), LK Goerlitz (146260000000)
- nina.html: zweiter Toggle "Ins Mesh senden"
- nina.js: Schalter in load/save + Statusbadge (Mesh+Web / Nur Web)
  + Mesh-Spalte in Alerts-Tabelle mit broadcast/eye-Icon

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-19 11:41:42 +01:00

394 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import os
from typing import Callable, Awaitable
import aiohttp
import yaml
logger = logging.getLogger(__name__)
NINA_API_BASE = "https://warnung.bund.de/api31"
NINA_CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "nina.yaml")
SEVERITY_ORDER = {
"Unknown": -1,
"Minor": 0,
"Moderate": 1,
"Severe": 2,
"Extreme": 3,
}
SEVERITY_LABELS = {
"Extreme": "EXTREM",
"Severe": "Schwerwiegend",
"Moderate": "Maessig",
"Minor": "Gering",
}
# Dashboard-Endpunkt: ID-Präfixe je Quelle (für Quellen-Filterung)
SOURCE_PREFIXES = {
"katwarn": ("katwarn.",),
"biwapp": ("biwapp.",),
"mowas": ("mowas.", "mow."),
"dwd": ("dwd.",),
"lhp": ("lhp.",),
"police": ("police.",),
}
# mapData-Endpunkte je Quelle
SOURCE_MAP_ENDPOINTS = {
"katwarn": "katwarn",
"biwapp": "biwapp",
"mowas": "mowas",
"dwd": "dwd",
"lhp": "lhp",
"police": "police",
}
# Normalisierung für quellenübergreifende De-Duplikation (mapData-Präfix → Dashboard-Präfix)
ID_NORMALIZATIONS = [
("dwdmap.", "dwd."),
("lhpmap.", "lhp."),
("polmap.", "police."),
("mow.", "mowas."),
]
DEFAULT_CONFIG = {
"enabled": False,
"send_to_mesh": True,
"poll_interval": 300,
"channel": 0,
"min_severity": "Severe",
"ags_codes": [],
"sources": {
"katwarn": True,
"biwapp": True,
"mowas": True,
"dwd": True,
"lhp": True,
"police": False,
},
}
class NinaBot:
"""Polls the NINA BBK warning API and forwards alerts to Meshtastic.
Two polling strategies run in parallel per cycle:
1. Dashboard regional, per AGS code (all sources, geographic filter by BBK server)
2. mapData national, per source (severity-only filter, covers gaps in dashboard)
Cross-strategy de-duplication is achieved by normalising warning IDs before
storing them in _known (e.g. 'dwdmap.''dwd.').
"""
def __init__(self, send_callback: Callable[[str, int], Awaitable[None]], ws_manager=None):
self.send_callback = send_callback
self.ws_manager = ws_manager
self.config: dict = {}
self._mtime: float = 0.0
self._known: dict[str, str] = {} # normalised_id -> sent (de-dup)
self._running = False
self._task: asyncio.Task | None = None
self._load()
# ── Config ──────────────────────────────────────────────────────────────
def _load(self):
try:
with open(NINA_CONFIG_PATH) as f:
data = yaml.safe_load(f) or {}
self.config = {**DEFAULT_CONFIG, **data}
if "sources" in data:
self.config["sources"] = {**DEFAULT_CONFIG["sources"], **data["sources"]}
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
logger.info(
"NINA config loaded (enabled=%s, codes=%s)",
self.config.get("enabled"),
self.config.get("ags_codes"),
)
except FileNotFoundError:
logger.info("No nina.yaml found using defaults")
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
self._save()
except Exception:
logger.exception("Error loading nina.yaml")
self.config = {**DEFAULT_CONFIG, "sources": dict(DEFAULT_CONFIG["sources"])}
def _save(self):
try:
with open(NINA_CONFIG_PATH, "w") as f:
yaml.dump(
self.config,
f,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
)
self._mtime = os.path.getmtime(NINA_CONFIG_PATH)
logger.info("NINA config saved")
except Exception:
logger.exception("Error saving nina.yaml")
def get_config(self) -> dict:
return self.config
def update_config(self, updates: dict) -> dict:
if "sources" in updates:
self.config["sources"] = {
**self.config.get("sources", {}),
**updates.pop("sources"),
}
self.config.update(updates)
self._save()
return self.config
# ── Lifecycle ────────────────────────────────────────────────────────────
async def start(self):
self._running = True
self._task = asyncio.create_task(self._poll_loop())
logger.info("NinaBot started")
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
# ── Hot-reload ───────────────────────────────────────────────────────────
async def watch(self, interval: float = 5.0):
"""Reload nina.yaml when the file changes on disk."""
while True:
await asyncio.sleep(interval)
try:
current_mtime = os.path.getmtime(NINA_CONFIG_PATH)
if current_mtime != self._mtime:
self._load()
except FileNotFoundError:
pass
except Exception:
logger.exception("Error watching nina.yaml")
# ── Polling ──────────────────────────────────────────────────────────────
async def _poll_loop(self):
while self._running:
try:
if self.config.get("enabled"):
await self._check_alerts()
except Exception:
logger.exception("NINA polling error")
interval = max(60, int(self.config.get("poll_interval", 300)))
await asyncio.sleep(interval)
async def _check_alerts(self):
min_level = SEVERITY_ORDER.get(self.config.get("min_severity", "Severe"), 2)
channel = int(self.config.get("channel", 0))
sources = self.config.get("sources", DEFAULT_CONFIG["sources"])
ags_codes = self.config.get("ags_codes", [])
async with aiohttp.ClientSession(
headers={"User-Agent": "MeshDD-Bot/1.0 (+https://github.com/ppfeiffer/MeshDD-Bot)"},
timeout=aiohttp.ClientTimeout(total=30),
) as session:
# 1. Dashboard: regional filtering per AGS code (server-side, all sources)
for ags in ags_codes:
try:
await self._fetch_dashboard(session, str(ags).strip(), min_level, channel, sources)
except Exception:
logger.exception("NINA dashboard error for AGS %s", ags)
# 2. mapData: national per-source polling (severity + source filter only)
for source_key, endpoint in SOURCE_MAP_ENDPOINTS.items():
if not sources.get(source_key, True):
continue
try:
await self._fetch_map_data(session, source_key, endpoint, min_level, channel)
except Exception:
logger.exception("NINA mapData error for source %s", source_key)
# ── De-duplication helper ────────────────────────────────────────────────
@staticmethod
def _normalise_id(identifier: str) -> str:
"""Normalise a warning ID to a canonical form for cross-source de-duplication."""
for old, new in ID_NORMALIZATIONS:
if identifier.startswith(old):
return new + identifier[len(old):]
return identifier
@staticmethod
def _source_key_for(identifier: str) -> str | None:
"""Return the source config key for a given warning ID, or None if unknown."""
for key, prefixes in SOURCE_PREFIXES.items():
for p in prefixes:
if identifier.startswith(p):
return key
return None
# ── Dashboard endpoint ───────────────────────────────────────────────────
async def _fetch_dashboard(
self,
session: aiohttp.ClientSession,
ags: str,
min_level: int,
channel: int,
sources: dict,
):
ars = ags.ljust(12, "0")
url = f"{NINA_API_BASE}/dashboard/{ars}.json"
async with session.get(url) as resp:
if resp.status == 404:
logger.warning("NINA dashboard: no data for AGS %s (404)", ags)
return
if resp.status != 200:
logger.warning("NINA dashboard: status %d for %s", resp.status, url)
return
items = await resp.json(content_type=None)
if not isinstance(items, list):
logger.warning("NINA dashboard: unexpected response type for AGS %s", ags)
return
for item in items:
try:
await self._process_dashboard_item(item, min_level, channel, sources)
except Exception:
logger.exception("NINA dashboard: error processing %s", item.get("id"))
async def _process_dashboard_item(
self, item: dict, min_level: int, channel: int, sources: dict
):
identifier = item.get("id", "")
if not identifier:
return
# Filter by source
src_key = self._source_key_for(identifier)
if src_key and not sources.get(src_key, True):
return
payload = item.get("payload", {})
sent = payload.get("sent", item.get("sent", ""))
data = payload.get("data", {})
msg_type = payload.get("msgType", data.get("msgType", "Alert"))
severity = data.get("severity", "Unknown")
sev_level = SEVERITY_ORDER.get(severity, -1)
if sev_level < min_level and msg_type != "Cancel":
return
dedup_key = self._normalise_id(identifier)
if dedup_key in self._known and self._known[dedup_key] == sent:
return
self._known[dedup_key] = sent
headline = data.get("headline", "Warnung")
description = data.get("description", "")
text = self._format_alert(msg_type, severity, headline, description)
logger.info("NINA dashboard alert: %s (id=%s)", headline, identifier)
await self._send(identifier, severity, msg_type, headline, sent, text, channel)
# ── mapData endpoint ─────────────────────────────────────────────────────
async def _fetch_map_data(
self,
session: aiohttp.ClientSession,
source_key: str,
endpoint: str,
min_level: int,
channel: int,
):
url = f"{NINA_API_BASE}/{endpoint}/mapData.json"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning("NINA mapData %s: status %d", source_key, resp.status)
return
items = await resp.json(content_type=None)
if not isinstance(items, list):
logger.warning("NINA mapData %s: unexpected response type", source_key)
return
for item in items:
try:
await self._process_map_item(item, min_level, channel)
except Exception:
logger.exception("NINA mapData: error processing %s", item.get("id"))
async def _process_map_item(self, item: dict, min_level: int, channel: int):
identifier = item.get("id", "")
if not identifier:
return
severity = item.get("severity", "Unknown")
msg_type = item.get("type", "Alert")
sent = item.get("startDate", item.get("sent", ""))
sev_level = SEVERITY_ORDER.get(severity, -1)
if sev_level < min_level and msg_type != "Cancel":
return
dedup_key = self._normalise_id(identifier)
if dedup_key in self._known and self._known[dedup_key] == sent:
return
self._known[dedup_key] = sent
# Headline aus i18nTitle (Deutsch bevorzugt)
i18n = item.get("i18nTitle", {})
headline = i18n.get("de") or i18n.get("en") or identifier
text = self._format_alert(msg_type, severity, headline, "")
logger.info("NINA mapData alert: %s (id=%s)", headline, identifier)
await self._send(identifier, severity, msg_type, headline, sent, text, channel)
# ── Shared helpers ───────────────────────────────────────────────────────
@staticmethod
def _format_alert(msg_type: str, severity: str, headline: str, description: str) -> str:
if msg_type == "Cancel":
return f"[NINA] Aufgehoben: {headline}"
sev_text = SEVERITY_LABELS.get(severity, severity)
text = f"[NINA] {sev_text}: {headline}"
if description:
short = description.strip()[:120]
if len(description.strip()) > 120:
short += "..."
text += f"\n{short}"
return text
async def _send(
self,
identifier: str,
severity: str,
msg_type: str,
headline: str,
sent: str,
text: str,
channel: int,
):
if self.config.get("send_to_mesh", True):
await self.send_callback(text, channel)
else:
logger.info("NINA monitor-only: Mesh-Versand deaktiviert, nur WebSocket-Broadcast")
if self.ws_manager:
await self.ws_manager.broadcast("nina_alert", {
"id": identifier,
"severity": severity,
"msgType": msg_type,
"headline": headline,
"sent": sent,
"monitor_only": not self.config.get("send_to_mesh", True),
})