MeshDD-Bot/meshbot/database.py
ppfeiffer 0232dfccd5 feat: v0.5.8 - Anfragen taeglich zuruecksetzen, Kommando-Badges entfernt
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 16:30:45 +01:00

291 lines
12 KiB
Python

import aiosqlite
import time
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.db: aiosqlite.Connection | None = None
async def connect(self):
self.db = await aiosqlite.connect(self.db_path)
self.db.row_factory = aiosqlite.Row
await self.db.execute("PRAGMA journal_mode=WAL")
await self._create_tables()
await self._migrate()
logger.info("Database connected: %s", self.db_path)
async def close(self):
if self.db:
await self.db.close()
async def _create_tables(self):
await self.db.executescript("""
CREATE TABLE IF NOT EXISTS nodes (
node_id TEXT PRIMARY KEY,
node_num INTEGER,
long_name TEXT,
short_name TEXT,
hw_model TEXT,
lat REAL,
lon REAL,
alt REAL,
battery INTEGER,
voltage REAL,
snr REAL,
rssi INTEGER,
last_seen REAL,
first_seen REAL,
hop_count INTEGER,
via_mqtt INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
from_node TEXT,
to_node TEXT,
channel INTEGER,
portnum TEXT,
payload TEXT
);
CREATE TABLE IF NOT EXISTS commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
command TEXT,
channel INTEGER
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
name TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user',
is_verified INTEGER NOT NULL DEFAULT 0,
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token TEXT UNIQUE NOT NULL,
type TEXT NOT NULL,
expires_at REAL NOT NULL,
used INTEGER NOT NULL DEFAULT 0,
created_at REAL NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient TEXT NOT NULL,
subject TEXT NOT NULL,
status TEXT NOT NULL,
error_message TEXT,
created_at REAL NOT NULL
);
""")
await self.db.commit()
async def _migrate(self):
async with self.db.execute("PRAGMA table_info(commands)") as c:
cols = {row[1] async for row in c}
if "channel" not in cols:
await self.db.execute("ALTER TABLE commands ADD COLUMN channel INTEGER")
await self.db.commit()
logger.info("Migration: added channel column to commands table")
# ── Node methods ──────────────────────────────────
async def upsert_node(self, node_id: str, **kwargs) -> dict:
now = time.time()
existing = await self.get_node(node_id)
if existing:
updates = {k: v for k, v in kwargs.items() if v is not None}
if not updates:
return dict(existing)
updates["last_seen"] = now
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [node_id]
await self.db.execute(
f"UPDATE nodes SET {set_clause} WHERE node_id = ?", values
)
else:
kwargs["node_id"] = node_id
kwargs.setdefault("first_seen", now)
kwargs["last_seen"] = now
cols = ", ".join(kwargs.keys())
placeholders = ", ".join("?" for _ in kwargs)
await self.db.execute(
f"INSERT INTO nodes ({cols}) VALUES ({placeholders})",
list(kwargs.values()),
)
await self.db.commit()
return dict(await self.get_node(node_id))
async def get_node(self, node_id: str) -> dict | None:
async with self.db.execute(
"SELECT * FROM nodes WHERE node_id = ?", (node_id,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def get_all_nodes(self) -> list[dict]:
async with self.db.execute(
"SELECT * FROM nodes ORDER BY last_seen DESC"
) as cursor:
return [dict(row) async for row in cursor]
async def get_nodes_with_position(self) -> list[dict]:
async with self.db.execute(
"SELECT * FROM nodes WHERE lat IS NOT NULL AND lon IS NOT NULL ORDER BY last_seen DESC"
) as cursor:
return [dict(row) async for row in cursor]
# ── Message methods ───────────────────────────────
async def insert_message(self, from_node: str, to_node: str, channel: int,
portnum: str, payload: str) -> dict:
now = time.time()
cursor = await self.db.execute(
"INSERT INTO messages (timestamp, from_node, to_node, channel, portnum, payload) VALUES (?, ?, ?, ?, ?, ?)",
(now, from_node, to_node, channel, portnum, payload),
)
await self.db.commit()
async with self.db.execute(
"SELECT * FROM messages WHERE id = ?", (cursor.lastrowid,)
) as c:
row = await c.fetchone()
return dict(row) if row else {}
async def get_recent_messages(self, limit: int = 50) -> list[dict]:
async with self.db.execute(
"SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,)
) as cursor:
return [dict(row) async for row in cursor]
# ── Command methods ───────────────────────────────
async def insert_command(self, command: str, channel: int | None = None):
now = time.time()
await self.db.execute(
"INSERT INTO commands (timestamp, command, channel) VALUES (?, ?, ?)",
(now, command, channel),
)
await self.db.commit()
async def get_stats(self) -> dict:
stats = {}
async with self.db.execute("SELECT COUNT(*) FROM nodes") as c:
stats["total_nodes"] = (await c.fetchone())[0]
now = time.time()
day_ago = now - 86400
async with self.db.execute(
"SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (day_ago,)
) as c:
stats["nodes_24h"] = (await c.fetchone())[0]
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()
async with self.db.execute(
"SELECT COUNT(*) FROM commands WHERE timestamp >= ?", (today_start,)
) as c:
stats["total_commands"] = (await c.fetchone())[0]
async with self.db.execute(
"SELECT channel, COUNT(*) as cnt FROM commands WHERE timestamp >= ? GROUP BY channel ORDER BY cnt DESC",
(today_start,),
) as cursor:
stats["channel_breakdown"] = {row[0]: row[1] async for row in cursor}
return stats
# ── User methods ──────────────────────────────────
async def create_user(self, email: str, password: str, name: str, role: str = "user", is_verified: int = 0) -> dict:
now = time.time()
cursor = await self.db.execute(
"INSERT INTO users (email, password, name, role, is_verified, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(email, password, name, role, is_verified, now, now),
)
await self.db.commit()
return await self.get_user_by_id(cursor.lastrowid)
async def get_user_by_email(self, email: str) -> dict | None:
async with self.db.execute(
"SELECT * FROM users WHERE email = ?", (email,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def get_user_by_id(self, user_id: int) -> dict | None:
async with self.db.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def get_all_users(self) -> list[dict]:
async with self.db.execute(
"SELECT id, email, name, role, is_verified, created_at, updated_at FROM users ORDER BY created_at DESC"
) as cursor:
return [dict(row) async for row in cursor]
async def update_user(self, user_id: int, **kwargs) -> dict | None:
if not kwargs:
return await self.get_user_by_id(user_id)
kwargs["updated_at"] = time.time()
set_clause = ", ".join(f"{k} = ?" for k in kwargs)
values = list(kwargs.values()) + [user_id]
await self.db.execute(
f"UPDATE users SET {set_clause} WHERE id = ?", values
)
await self.db.commit()
return await self.get_user_by_id(user_id)
async def delete_user(self, user_id: int) -> bool:
cursor = await self.db.execute("DELETE FROM users WHERE id = ?", (user_id,))
await self.db.commit()
return cursor.rowcount > 0
# ── Token methods ─────────────────────────────────
async def create_token(self, user_id: int, token: str, token_type: str, expires_at: float) -> dict:
now = time.time()
cursor = await self.db.execute(
"INSERT INTO tokens (user_id, token, type, expires_at, used, created_at) VALUES (?, ?, ?, ?, 0, ?)",
(user_id, token, token_type, expires_at, now),
)
await self.db.commit()
async with self.db.execute("SELECT * FROM tokens WHERE id = ?", (cursor.lastrowid,)) as c:
row = await c.fetchone()
return dict(row) if row else {}
async def get_valid_token(self, token: str, token_type: str) -> dict | None:
now = time.time()
async with self.db.execute(
"SELECT * FROM tokens WHERE token = ? AND type = ? AND used = 0 AND expires_at > ?",
(token, token_type, now),
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def mark_token_used(self, token_id: int):
await self.db.execute("UPDATE tokens SET used = 1 WHERE id = ?", (token_id,))
await self.db.commit()
# ── Email log methods ─────────────────────────────
async def log_email(self, recipient: str, subject: str, status: str, error_message: str = None):
now = time.time()
await self.db.execute(
"INSERT INTO email_logs (recipient, subject, status, error_message, created_at) VALUES (?, ?, ?, ?, ?)",
(recipient, subject, status, error_message, now),
)
await self.db.commit()