import aiosqlite import time import logging logger = logging.getLogger(__name__) class Database: def __init__(self, db_path: str): self.db_path = db_path self.db: aiosqlite.Connection | None = None async def connect(self): self.db = await aiosqlite.connect(self.db_path) self.db.row_factory = aiosqlite.Row await self.db.execute("PRAGMA journal_mode=WAL") await self._create_tables() logger.info("Database connected: %s", self.db_path) async def close(self): if self.db: await self.db.close() async def _create_tables(self): await self.db.executescript(""" CREATE TABLE IF NOT EXISTS nodes ( node_id TEXT PRIMARY KEY, node_num INTEGER, long_name TEXT, short_name TEXT, hw_model TEXT, lat REAL, lon REAL, alt REAL, battery INTEGER, voltage REAL, snr REAL, rssi INTEGER, last_seen REAL, first_seen REAL, hop_count INTEGER, via_mqtt INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL, from_node TEXT, to_node TEXT, channel INTEGER, portnum TEXT, payload TEXT ); CREATE TABLE IF NOT EXISTS commands ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL, command TEXT ); """) await self.db.commit() async def upsert_node(self, node_id: str, **kwargs) -> dict: now = time.time() existing = await self.get_node(node_id) if existing: updates = {k: v for k, v in kwargs.items() if v is not None} if not updates: return dict(existing) updates["last_seen"] = now set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [node_id] await self.db.execute( f"UPDATE nodes SET {set_clause} WHERE node_id = ?", values ) else: kwargs["node_id"] = node_id kwargs.setdefault("first_seen", now) kwargs["last_seen"] = now cols = ", ".join(kwargs.keys()) placeholders = ", ".join("?" for _ in kwargs) await self.db.execute( f"INSERT INTO nodes ({cols}) VALUES ({placeholders})", list(kwargs.values()), ) await self.db.commit() return dict(await self.get_node(node_id)) async def get_node(self, node_id: str) -> dict | None: async with self.db.execute( "SELECT * FROM nodes WHERE node_id = ?", (node_id,) ) as cursor: row = await cursor.fetchone() return dict(row) if row else None async def get_all_nodes(self) -> list[dict]: async with self.db.execute( "SELECT * FROM nodes ORDER BY last_seen DESC" ) as cursor: return [dict(row) async for row in cursor] async def get_nodes_with_position(self) -> list[dict]: async with self.db.execute( "SELECT * FROM nodes WHERE lat IS NOT NULL AND lon IS NOT NULL ORDER BY last_seen DESC" ) as cursor: return [dict(row) async for row in cursor] async def insert_message(self, from_node: str, to_node: str, channel: int, portnum: str, payload: str) -> dict: now = time.time() cursor = await self.db.execute( "INSERT INTO messages (timestamp, from_node, to_node, channel, portnum, payload) VALUES (?, ?, ?, ?, ?, ?)", (now, from_node, to_node, channel, portnum, payload), ) await self.db.commit() async with self.db.execute( "SELECT * FROM messages WHERE id = ?", (cursor.lastrowid,) ) as c: row = await c.fetchone() return dict(row) if row else {} async def get_recent_messages(self, limit: int = 50) -> list[dict]: async with self.db.execute( "SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,) ) as cursor: return [dict(row) async for row in cursor] async def insert_command(self, command: str): now = time.time() await self.db.execute( "INSERT INTO commands (timestamp, command) VALUES (?, ?)", (now, command), ) await self.db.commit() async def get_stats(self) -> dict: stats = {} async with self.db.execute("SELECT COUNT(*) FROM nodes") as c: stats["total_nodes"] = (await c.fetchone())[0] now = time.time() day_ago = now - 86400 async with self.db.execute( "SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (day_ago,) ) as c: stats["nodes_24h"] = (await c.fetchone())[0] async with self.db.execute("SELECT COUNT(*) FROM commands") as c: stats["total_commands"] = (await c.fetchone())[0] async with self.db.execute( "SELECT command, COUNT(*) as cnt FROM commands GROUP BY command ORDER BY cnt DESC" ) as cursor: stats["command_breakdown"] = {row[0]: row[1] async for row in cursor} return stats