- Track bot command responses in new `commands` DB table - Stats cards: total nodes, active 24h, total commands answered - Full-width command breakdown row with badges per command - Update bot /stats response Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
155 lines
5.4 KiB
Python
155 lines
5.4 KiB
Python
import aiosqlite
|
|
import time
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Database:
|
|
def __init__(self, db_path: str):
|
|
self.db_path = db_path
|
|
self.db: aiosqlite.Connection | None = None
|
|
|
|
async def connect(self):
|
|
self.db = await aiosqlite.connect(self.db_path)
|
|
self.db.row_factory = aiosqlite.Row
|
|
await self.db.execute("PRAGMA journal_mode=WAL")
|
|
await self._create_tables()
|
|
logger.info("Database connected: %s", self.db_path)
|
|
|
|
async def close(self):
|
|
if self.db:
|
|
await self.db.close()
|
|
|
|
async def _create_tables(self):
|
|
await self.db.executescript("""
|
|
CREATE TABLE IF NOT EXISTS nodes (
|
|
node_id TEXT PRIMARY KEY,
|
|
node_num INTEGER,
|
|
long_name TEXT,
|
|
short_name TEXT,
|
|
hw_model TEXT,
|
|
lat REAL,
|
|
lon REAL,
|
|
alt REAL,
|
|
battery INTEGER,
|
|
voltage REAL,
|
|
snr REAL,
|
|
rssi INTEGER,
|
|
last_seen REAL,
|
|
first_seen REAL,
|
|
hop_count INTEGER,
|
|
via_mqtt INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL,
|
|
from_node TEXT,
|
|
to_node TEXT,
|
|
channel INTEGER,
|
|
portnum TEXT,
|
|
payload TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS commands (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL,
|
|
command TEXT
|
|
);
|
|
""")
|
|
await self.db.commit()
|
|
|
|
async def upsert_node(self, node_id: str, **kwargs) -> dict:
|
|
now = time.time()
|
|
existing = await self.get_node(node_id)
|
|
|
|
if existing:
|
|
updates = {k: v for k, v in kwargs.items() if v is not None}
|
|
if not updates:
|
|
return dict(existing)
|
|
updates["last_seen"] = now
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values()) + [node_id]
|
|
await self.db.execute(
|
|
f"UPDATE nodes SET {set_clause} WHERE node_id = ?", values
|
|
)
|
|
else:
|
|
kwargs["node_id"] = node_id
|
|
kwargs.setdefault("first_seen", now)
|
|
kwargs["last_seen"] = now
|
|
cols = ", ".join(kwargs.keys())
|
|
placeholders = ", ".join("?" for _ in kwargs)
|
|
await self.db.execute(
|
|
f"INSERT INTO nodes ({cols}) VALUES ({placeholders})",
|
|
list(kwargs.values()),
|
|
)
|
|
|
|
await self.db.commit()
|
|
return dict(await self.get_node(node_id))
|
|
|
|
async def get_node(self, node_id: str) -> dict | None:
|
|
async with self.db.execute(
|
|
"SELECT * FROM nodes WHERE node_id = ?", (node_id,)
|
|
) as cursor:
|
|
row = await cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
async def get_all_nodes(self) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT * FROM nodes ORDER BY last_seen DESC"
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
|
|
async def get_nodes_with_position(self) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT * FROM nodes WHERE lat IS NOT NULL AND lon IS NOT NULL ORDER BY last_seen DESC"
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
|
|
async def insert_message(self, from_node: str, to_node: str, channel: int,
|
|
portnum: str, payload: str) -> dict:
|
|
now = time.time()
|
|
cursor = await self.db.execute(
|
|
"INSERT INTO messages (timestamp, from_node, to_node, channel, portnum, payload) VALUES (?, ?, ?, ?, ?, ?)",
|
|
(now, from_node, to_node, channel, portnum, payload),
|
|
)
|
|
await self.db.commit()
|
|
async with self.db.execute(
|
|
"SELECT * FROM messages WHERE id = ?", (cursor.lastrowid,)
|
|
) as c:
|
|
row = await c.fetchone()
|
|
return dict(row) if row else {}
|
|
|
|
async def get_recent_messages(self, limit: int = 50) -> list[dict]:
|
|
async with self.db.execute(
|
|
"SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,)
|
|
) as cursor:
|
|
return [dict(row) async for row in cursor]
|
|
|
|
async def insert_command(self, command: str):
|
|
now = time.time()
|
|
await self.db.execute(
|
|
"INSERT INTO commands (timestamp, command) VALUES (?, ?)",
|
|
(now, command),
|
|
)
|
|
await self.db.commit()
|
|
|
|
async def get_stats(self) -> dict:
|
|
stats = {}
|
|
async with self.db.execute("SELECT COUNT(*) FROM nodes") as c:
|
|
stats["total_nodes"] = (await c.fetchone())[0]
|
|
now = time.time()
|
|
day_ago = now - 86400
|
|
async with self.db.execute(
|
|
"SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (day_ago,)
|
|
) as c:
|
|
stats["nodes_24h"] = (await c.fetchone())[0]
|
|
async with self.db.execute("SELECT COUNT(*) FROM commands") as c:
|
|
stats["total_commands"] = (await c.fetchone())[0]
|
|
async with self.db.execute(
|
|
"SELECT command, COUNT(*) as cnt FROM commands GROUP BY command ORDER BY cnt DESC"
|
|
) as cursor:
|
|
stats["command_breakdown"] = {row[0]: row[1] async for row in cursor}
|
|
return stats
|