get_stats() filtert TELEMETRY_APP-Pakete des eigenen Nodes wenn my_node_id übergeben wird – konsistent mit isSuppressed() im Paket-Log-Frontend. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
340 lines
14 KiB
Python
340 lines
14 KiB
Python
import aiosqlite
|
|
import time
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Database:
|
|
def __init__(self, db_path: str):
|
|
self.db_path = db_path
|
|
self.db: aiosqlite.Connection | None = None
|
|
|
|
async def connect(self):
|
|
self.db = await aiosqlite.connect(self.db_path)
|
|
self.db.row_factory = aiosqlite.Row
|
|
await self.db.execute("PRAGMA journal_mode=WAL")
|
|
await self._create_tables()
|
|
await self._migrate()
|
|
logger.info("Database connected: %s", self.db_path)
|
|
|
|
async def close(self):
|
|
if self.db:
|
|
await self.db.close()
|
|
|
|
async def _create_tables(self):
|
|
await self.db.executescript("""
|
|
CREATE TABLE IF NOT EXISTS nodes (
|
|
node_id TEXT PRIMARY KEY,
|
|
node_num INTEGER,
|
|
long_name TEXT,
|
|
short_name TEXT,
|
|
hw_model TEXT,
|
|
lat REAL,
|
|
lon REAL,
|
|
alt REAL,
|
|
battery INTEGER,
|
|
voltage REAL,
|
|
snr REAL,
|
|
rssi INTEGER,
|
|
last_seen REAL,
|
|
first_seen REAL,
|
|
hop_count INTEGER,
|
|
via_mqtt INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL,
|
|
from_node TEXT,
|
|
to_node TEXT,
|
|
channel INTEGER,
|
|
portnum TEXT,
|
|
payload TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS commands (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL,
|
|
command TEXT,
|
|
channel INTEGER
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
email TEXT UNIQUE NOT NULL,
|
|
password TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
role TEXT NOT NULL DEFAULT 'user',
|
|
is_verified INTEGER NOT NULL DEFAULT 0,
|
|
created_at REAL NOT NULL,
|
|
updated_at REAL NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS tokens (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
token TEXT UNIQUE NOT NULL,
|
|
type TEXT NOT NULL,
|
|
expires_at REAL NOT NULL,
|
|
used INTEGER NOT NULL DEFAULT 0,
|
|
created_at REAL NOT NULL,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS packets (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL,
|
|
from_id TEXT,
|
|
to_id TEXT,
|
|
portnum TEXT,
|
|
channel INTEGER,
|
|
snr REAL,
|
|
rssi INTEGER,
|
|
hop_limit INTEGER,
|
|
hop_start INTEGER,
|
|
packet_id INTEGER,
|
|
payload TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS email_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
recipient TEXT NOT NULL,
|
|
subject TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
error_message TEXT,
|
|
created_at REAL NOT NULL
|
|
);
|
|
""")
|
|
await self.db.commit()
|
|
|
|
async def _migrate(self):
|
|
async with self.db.execute("PRAGMA table_info(commands)") as c:
|
|
cols = {row[1] async for row in c}
|
|
if "channel" not in cols:
|
|
await self.db.execute("ALTER TABLE commands ADD COLUMN channel INTEGER")
|
|
await self.db.commit()
|
|
logger.info("Migration: added channel column to commands table")
|
|
|
|
# ── Node methods ──────────────────────────────────
|
|
|
|
async def upsert_node(self, node_id: str, **kwargs) -> dict:
|
|
now = time.time()
|
|
existing = await self.get_node(node_id)
|
|
|
|
if existing:
|
|
updates = {k: v for k, v in kwargs.items() if v is not None}
|
|
if not updates:
|
|
return dict(existing)
|
|
updates["last_seen"] = now
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values()) + [node_id]
|
|
await self.db.execute(
|
|
f"UPDATE nodes SET {set_clause} WHERE node_id = ?", values
|
|
)
|
|
else:
|
|
kwargs["node_id"] = node_id
|
|
kwargs.setdefault("first_seen", now)
|
|
kwargs["last_seen"] = now
|
|
cols = ", ".join(kwargs.keys())
|
|
placeholders = ", ".join("?" for _ in kwargs)
|
|
await self.db.execute(
|
|
f"INSERT INTO nodes ({cols}) VALUES ({placeholders})",
|
|
list(kwargs.values()),
|
|
)
|
|
|
|
await self.db.commit()
|
|
return dict(await self.get_node(node_id))
|
|
|
|
async def get_node(self, node_id: str) -> dict | None:
|
|
async with self.db.execute(
|
|
"SELECT * FROM nodes WHERE node_id = ?", (node_id,)
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
async def get_all_nodes(self) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT * FROM nodes ORDER BY last_seen DESC"
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
|
|
async def get_nodes_with_position(self) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT * FROM nodes WHERE lat IS NOT NULL AND lon IS NOT NULL ORDER BY last_seen DESC"
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
|
|
# ── Message methods ───────────────────────────────
|
|
|
|
async def insert_message(self, from_node: str, to_node: str, channel: int,
|
|
portnum: str, payload: str) -> dict:
|
|
now = time.time()
|
|
cursor = await self.db.execute(
|
|
"INSERT INTO messages (timestamp, from_node, to_node, channel, portnum, payload) VALUES (?, ?, ?, ?, ?, ?)",
|
|
(now, from_node, to_node, channel, portnum, payload),
|
|
)
|
|
await self.db.commit()
|
|
async with self.db.execute(
|
|
"SELECT * FROM messages WHERE id = ?", (cursor.lastrowid,)
|
|
) as c:
|
|
row = await c.fetchone()
|
|
return dict(row) if row else {}
|
|
|
|
async def get_recent_messages(self, limit: int = 50) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,)
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
|
|
# ── Command methods ───────────────────────────────
|
|
|
|
async def insert_command(self, command: str, channel: int | None = None):
|
|
now = time.time()
|
|
await self.db.execute(
|
|
"INSERT INTO commands (timestamp, command, channel) VALUES (?, ?, ?)",
|
|
(now, command, channel),
|
|
)
|
|
await self.db.commit()
|
|
|
|
async def get_stats(self, my_node_id: str | None = None) -> dict:
|
|
stats = {}
|
|
async with self.db.execute("SELECT COUNT(*) FROM nodes") as c:
|
|
stats["total_nodes"] = (await c.fetchone())[0]
|
|
now = time.time()
|
|
day_ago = now - 86400
|
|
async with self.db.execute(
|
|
"SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (day_ago,)
|
|
) as c:
|
|
stats["nodes_24h"] = (await c.fetchone())[0]
|
|
async with self.db.execute(
|
|
"SELECT COUNT(*) FROM commands WHERE timestamp >= ?", (day_ago,)
|
|
) as c:
|
|
stats["total_commands"] = (await c.fetchone())[0]
|
|
async with self.db.execute(
|
|
"SELECT channel, COUNT(*) as cnt FROM commands WHERE timestamp >= ? GROUP BY channel ORDER BY cnt DESC",
|
|
(day_ago,),
|
|
) as cursor:
|
|
stats["channel_breakdown"] = {row[0]: row[1] async for row in cursor}
|
|
if my_node_id:
|
|
pkt_query = (
|
|
"SELECT portnum, COUNT(*) as cnt FROM packets "
|
|
"WHERE timestamp >= ? AND NOT (portnum = 'TELEMETRY_APP' AND from_id = ?) "
|
|
"GROUP BY portnum ORDER BY cnt DESC"
|
|
)
|
|
pkt_params = (day_ago, my_node_id)
|
|
else:
|
|
pkt_query = (
|
|
"SELECT portnum, COUNT(*) as cnt FROM packets "
|
|
"WHERE timestamp >= ? GROUP BY portnum ORDER BY cnt DESC"
|
|
)
|
|
pkt_params = (day_ago,)
|
|
async with self.db.execute(pkt_query, pkt_params) as cursor:
|
|
stats["packet_type_breakdown"] = {(row[0] or "?"): row[1] async for row in cursor}
|
|
return stats
|
|
|
|
# ── User methods ──────────────────────────────────
|
|
|
|
async def create_user(self, email: str, password: str, name: str, role: str = "user", is_verified: int = 0) -> dict:
|
|
now = time.time()
|
|
cursor = await self.db.execute(
|
|
"INSERT INTO users (email, password, name, role, is_verified, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(email, password, name, role, is_verified, now, now),
|
|
)
|
|
await self.db.commit()
|
|
return await self.get_user_by_id(cursor.lastrowid)
|
|
|
|
async def get_user_by_email(self, email: str) -> dict | None:
|
|
async with self.db.execute(
|
|
"SELECT * FROM users WHERE email = ?", (email,)
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
async def get_user_by_id(self, user_id: int) -> dict | None:
|
|
async with self.db.execute(
|
|
"SELECT * FROM users WHERE id = ?", (user_id,)
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
async def get_all_users(self) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT id, email, name, role, is_verified, created_at, updated_at FROM users ORDER BY created_at DESC"
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
|
|
async def update_user(self, user_id: int, **kwargs) -> dict | None:
|
|
if not kwargs:
|
|
return await self.get_user_by_id(user_id)
|
|
kwargs["updated_at"] = time.time()
|
|
set_clause = ", ".join(f"{k} = ?" for k in kwargs)
|
|
values = list(kwargs.values()) + [user_id]
|
|
await self.db.execute(
|
|
f"UPDATE users SET {set_clause} WHERE id = ?", values
|
|
)
|
|
await self.db.commit()
|
|
return await self.get_user_by_id(user_id)
|
|
|
|
async def delete_user(self, user_id: int) -> bool:
|
|
cursor = await self.db.execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
await self.db.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
# ── Token methods ─────────────────────────────────
|
|
|
|
async def create_token(self, user_id: int, token: str, token_type: str, expires_at: float) -> dict:
|
|
now = time.time()
|
|
cursor = await self.db.execute(
|
|
"INSERT INTO tokens (user_id, token, type, expires_at, used, created_at) VALUES (?, ?, ?, ?, 0, ?)",
|
|
(user_id, token, token_type, expires_at, now),
|
|
)
|
|
await self.db.commit()
|
|
async with self.db.execute("SELECT * FROM tokens WHERE id = ?", (cursor.lastrowid,)) as c:
|
|
row = await c.fetchone()
|
|
return dict(row) if row else {}
|
|
|
|
async def get_valid_token(self, token: str, token_type: str) -> dict | None:
|
|
now = time.time()
|
|
async with self.db.execute(
|
|
"SELECT * FROM tokens WHERE token = ? AND type = ? AND used = 0 AND expires_at > ?",
|
|
(token, token_type, now),
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
async def mark_token_used(self, token_id: int):
|
|
await self.db.execute("UPDATE tokens SET used = 1 WHERE id = ?", (token_id,))
|
|
await self.db.commit()
|
|
|
|
# ── Email log methods ─────────────────────────────
|
|
|
|
async def log_email(self, recipient: str, subject: str, status: str, error_message: str = None):
|
|
now = time.time()
|
|
await self.db.execute(
|
|
"INSERT INTO email_logs (recipient, subject, status, error_message, created_at) VALUES (?, ?, ?, ?, ?)",
|
|
(recipient, subject, status, error_message, now),
|
|
)
|
|
await self.db.commit()
|
|
|
|
# ── Packet log methods ────────────────────────────
|
|
|
|
async def insert_packet(self, from_id: str, to_id: str, portnum: str, channel: int,
|
|
snr, rssi, hop_limit, hop_start, packet_id, payload: str) -> dict:
|
|
now = time.time()
|
|
cursor = await self.db.execute(
|
|
"INSERT INTO packets (timestamp, from_id, to_id, portnum, channel, snr, rssi, "
|
|
"hop_limit, hop_start, packet_id, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(now, from_id, to_id, portnum, channel, snr, rssi, hop_limit, hop_start, packet_id, payload),
|
|
)
|
|
await self.db.commit()
|
|
async with self.db.execute("SELECT * FROM packets WHERE id = ?", (cursor.lastrowid,)) as c:
|
|
row = await c.fetchone()
|
|
return dict(row) if row else {}
|
|
|
|
async def get_recent_packets(self, limit: int = 200) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT * FROM packets ORDER BY timestamp DESC LIMIT ?", (limit,)
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|