MeshDD-Bot/meshbot/database.py
ppfeiffer c443a9f26d feat(auth): Rolle Mitarbeiter + Einladungs-Workflow (closes #7)
- Rollensystem: Public → Mitarbeiter → Admin (Rolle user entfällt)
- DB-Migration: must_change_password-Spalte, user→mitarbeiter
- require_staff_api(): erlaubt mitarbeiter + admin
- POST /api/admin/invite: Einladung mit auto-generiertem Passwort + E-Mail
- POST /auth/change-password: Pflicht-Passwortwechsel
- Login: force_password_change-Redirect
- Sidebar: sidebar-staff für Scheduler/NINA/Einstellungen
- Scheduler/NINA: read-only für Mitarbeiter

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 22:51:06 +01:00

361 lines
15 KiB
Python

import aiosqlite
import time
import logging
from meshbot import config
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.db: aiosqlite.Connection | None = None
async def connect(self):
self.db = await aiosqlite.connect(self.db_path)
self.db.row_factory = aiosqlite.Row
await self.db.execute("PRAGMA journal_mode=WAL")
await self._create_tables()
await self._migrate()
logger.info("Database connected: %s", self.db_path)
async def close(self):
if self.db:
try:
await self.db.execute("PRAGMA wal_checkpoint(FULL)")
await self.db.commit()
except Exception:
logger.exception("Error during WAL checkpoint on close")
await self.db.close()
self.db = None
logger.info("Database closed")
async def _create_tables(self):
await self.db.executescript("""
CREATE TABLE IF NOT EXISTS nodes (
node_id TEXT PRIMARY KEY,
node_num INTEGER,
long_name TEXT,
short_name TEXT,
hw_model TEXT,
lat REAL,
lon REAL,
alt REAL,
battery INTEGER,
voltage REAL,
snr REAL,
rssi INTEGER,
last_seen REAL,
first_seen REAL,
hop_count INTEGER,
via_mqtt INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
from_node TEXT,
to_node TEXT,
channel INTEGER,
portnum TEXT,
payload TEXT
);
CREATE TABLE IF NOT EXISTS commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
command TEXT,
channel INTEGER
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
name TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'mitarbeiter',
is_verified INTEGER NOT NULL DEFAULT 0,
must_change_password INTEGER NOT NULL DEFAULT 0,
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token TEXT UNIQUE NOT NULL,
type TEXT NOT NULL,
expires_at REAL NOT NULL,
used INTEGER NOT NULL DEFAULT 0,
created_at REAL NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS packets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
from_id TEXT,
to_id TEXT,
portnum TEXT,
channel INTEGER,
snr REAL,
rssi INTEGER,
hop_limit INTEGER,
hop_start INTEGER,
packet_id INTEGER,
payload TEXT
);
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient TEXT NOT NULL,
subject TEXT NOT NULL,
status TEXT NOT NULL,
error_message TEXT,
created_at REAL NOT NULL
);
""")
await self.db.commit()
async def _migrate(self):
async with self.db.execute("PRAGMA table_info(commands)") as c:
cols = {row[1] async for row in c}
if "channel" not in cols:
await self.db.execute("ALTER TABLE commands ADD COLUMN channel INTEGER")
await self.db.commit()
logger.info("Migration: added channel column to commands table")
async with self.db.execute("PRAGMA table_info(users)") as c:
cols = {row[1] async for row in c}
if "must_change_password" not in cols:
await self.db.execute(
"ALTER TABLE users ADD COLUMN must_change_password INTEGER NOT NULL DEFAULT 0"
)
await self.db.commit()
logger.info("Migration: added must_change_password column to users table")
# Migrate legacy role 'user' → 'mitarbeiter'
await self.db.execute("UPDATE users SET role = 'mitarbeiter' WHERE role = 'user'")
await self.db.commit()
# ── Node methods ──────────────────────────────────
async def upsert_node(self, node_id: str, **kwargs) -> dict:
now = time.time()
# Row anlegen falls nicht vorhanden (first_seen nur beim ersten Mal gesetzt)
await self.db.execute(
"INSERT OR IGNORE INTO nodes (node_id, first_seen, last_seen) VALUES (?, ?, ?)",
(node_id, now, now),
)
# Nicht-None-Felder + last_seen immer aktualisieren
updates = {k: v for k, v in kwargs.items() if v is not None}
updates["last_seen"] = now
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [node_id]
await self.db.execute(
f"UPDATE nodes SET {set_clause} WHERE node_id = ?", values
)
await self.db.commit()
return dict(await self.get_node(node_id))
async def get_node(self, node_id: str) -> dict | None:
async with self.db.execute(
"SELECT * FROM nodes WHERE node_id = ?", (node_id,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def get_all_nodes(self) -> list[dict]:
async with self.db.execute(
"SELECT * FROM nodes ORDER BY last_seen DESC"
) as cursor:
return [dict(row) async for row in cursor]
async def get_nodes_with_position(self) -> list[dict]:
async with self.db.execute(
"SELECT * FROM nodes WHERE lat IS NOT NULL AND lon IS NOT NULL ORDER BY last_seen DESC"
) as cursor:
return [dict(row) async for row in cursor]
# ── Message methods ───────────────────────────────
async def insert_message(self, from_node: str, to_node: str, channel: int,
portnum: str, payload: str) -> dict:
now = time.time()
cursor = await self.db.execute(
"INSERT INTO messages (timestamp, from_node, to_node, channel, portnum, payload) VALUES (?, ?, ?, ?, ?, ?)",
(now, from_node, to_node, channel, portnum, payload),
)
await self.db.commit()
async with self.db.execute(
"SELECT * FROM messages WHERE id = ?", (cursor.lastrowid,)
) as c:
row = await c.fetchone()
return dict(row) if row else {}
async def get_recent_messages(self, limit: int = 50) -> list[dict]:
async with self.db.execute(
"SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,)
) as cursor:
return [dict(row) async for row in cursor]
# ── Command methods ───────────────────────────────
async def insert_command(self, command: str, channel: int | None = None):
now = time.time()
await self.db.execute(
"INSERT INTO commands (timestamp, command, channel) VALUES (?, ?, ?)",
(now, command, channel),
)
await self.db.commit()
async def get_stats(self, my_node_id: str | None = None) -> dict:
stats = {}
async with self.db.execute("SELECT COUNT(*) FROM nodes") as c:
stats["total_nodes"] = (await c.fetchone())[0]
now = time.time()
day_ago = now - 86400
async with self.db.execute(
"SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (day_ago,)
) as c:
stats["nodes_24h"] = (await c.fetchone())[0]
threshold = config.get("web.online_threshold", 900)
async with self.db.execute(
"SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (now - threshold,)
) as c:
stats["nodes_online"] = (await c.fetchone())[0]
async with self.db.execute(
"SELECT COUNT(*) FROM commands WHERE timestamp >= ?", (day_ago,)
) as c:
stats["total_commands"] = (await c.fetchone())[0]
async with self.db.execute(
"SELECT channel, COUNT(*) as cnt FROM commands WHERE timestamp >= ? GROUP BY channel ORDER BY cnt DESC",
(day_ago,),
) as cursor:
stats["channel_breakdown"] = {row[0]: row[1] async for row in cursor}
if my_node_id:
pkt_query = (
"SELECT portnum, COUNT(*) as cnt FROM packets "
"WHERE timestamp >= ? AND NOT (portnum = 'TELEMETRY_APP' AND from_id = ?) "
"GROUP BY portnum ORDER BY cnt DESC"
)
pkt_params = (day_ago, my_node_id)
else:
pkt_query = (
"SELECT portnum, COUNT(*) as cnt FROM packets "
"WHERE timestamp >= ? GROUP BY portnum ORDER BY cnt DESC"
)
pkt_params = (day_ago,)
async with self.db.execute(pkt_query, pkt_params) as cursor:
stats["packet_type_breakdown"] = {(row[0] or "?"): row[1] async for row in cursor}
return stats
# ── User methods ──────────────────────────────────
async def create_user(self, email: str, password: str, name: str, role: str = "mitarbeiter",
is_verified: int = 0, must_change_password: int = 0) -> dict:
now = time.time()
cursor = await self.db.execute(
"INSERT INTO users (email, password, name, role, is_verified, must_change_password, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(email, password, name, role, is_verified, must_change_password, now, now),
)
await self.db.commit()
return await self.get_user_by_id(cursor.lastrowid)
async def get_user_by_email(self, email: str) -> dict | None:
async with self.db.execute(
"SELECT * FROM users WHERE email = ?", (email,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def get_user_by_id(self, user_id: int) -> dict | None:
async with self.db.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def get_all_users(self) -> list[dict]:
async with self.db.execute(
"SELECT id, email, name, role, is_verified, must_change_password, created_at, updated_at "
"FROM users ORDER BY created_at DESC"
) as cursor:
return [dict(row) async for row in cursor]
async def update_user(self, user_id: int, **kwargs) -> dict | None:
if not kwargs:
return await self.get_user_by_id(user_id)
kwargs["updated_at"] = time.time()
set_clause = ", ".join(f"{k} = ?" for k in kwargs)
values = list(kwargs.values()) + [user_id]
await self.db.execute(
f"UPDATE users SET {set_clause} WHERE id = ?", values
)
await self.db.commit()
return await self.get_user_by_id(user_id)
async def delete_user(self, user_id: int) -> bool:
cursor = await self.db.execute("DELETE FROM users WHERE id = ?", (user_id,))
await self.db.commit()
return cursor.rowcount > 0
# ── Token methods ─────────────────────────────────
async def create_token(self, user_id: int, token: str, token_type: str, expires_at: float) -> dict:
now = time.time()
cursor = await self.db.execute(
"INSERT INTO tokens (user_id, token, type, expires_at, used, created_at) VALUES (?, ?, ?, ?, 0, ?)",
(user_id, token, token_type, expires_at, now),
)
await self.db.commit()
async with self.db.execute("SELECT * FROM tokens WHERE id = ?", (cursor.lastrowid,)) as c:
row = await c.fetchone()
return dict(row) if row else {}
async def get_valid_token(self, token: str, token_type: str) -> dict | None:
now = time.time()
async with self.db.execute(
"SELECT * FROM tokens WHERE token = ? AND type = ? AND used = 0 AND expires_at > ?",
(token, token_type, now),
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def mark_token_used(self, token_id: int):
await self.db.execute("UPDATE tokens SET used = 1 WHERE id = ?", (token_id,))
await self.db.commit()
# ── Email log methods ─────────────────────────────
async def log_email(self, recipient: str, subject: str, status: str, error_message: str = None):
now = time.time()
await self.db.execute(
"INSERT INTO email_logs (recipient, subject, status, error_message, created_at) VALUES (?, ?, ?, ?, ?)",
(recipient, subject, status, error_message, now),
)
await self.db.commit()
# ── Packet log methods ────────────────────────────
async def insert_packet(self, from_id: str, to_id: str, portnum: str, channel: int,
snr, rssi, hop_limit, hop_start, packet_id, payload: str) -> dict:
now = time.time()
cursor = await self.db.execute(
"INSERT INTO packets (timestamp, from_id, to_id, portnum, channel, snr, rssi, "
"hop_limit, hop_start, packet_id, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(now, from_id, to_id, portnum, channel, snr, rssi, hop_limit, hop_start, packet_id, payload),
)
await self.db.commit()
async with self.db.execute("SELECT * FROM packets WHERE id = ?", (cursor.lastrowid,)) as c:
row = await c.fetchone()
return dict(row) if row else {}
async def get_recent_packets(self, limit: int = 200) -> list[dict]:
async with self.db.execute(
"SELECT * FROM packets ORDER BY timestamp DESC LIMIT ?", (limit,)
) as cursor:
return [dict(row) async for row in cursor]