import aiosqlite import time import logging from meshbot import config logger = logging.getLogger(__name__) class Database: def __init__(self, db_path: str): self.db_path = db_path self.db: aiosqlite.Connection | None = None async def connect(self): self.db = await aiosqlite.connect(self.db_path) self.db.row_factory = aiosqlite.Row await self.db.execute("PRAGMA journal_mode=WAL") await self._create_tables() await self._migrate() logger.info("Database connected: %s", self.db_path) async def close(self): if self.db: try: await self.db.execute("PRAGMA wal_checkpoint(FULL)") await self.db.commit() except Exception: logger.exception("Error during WAL checkpoint on close") await self.db.close() self.db = None logger.info("Database closed") async def _create_tables(self): await self.db.executescript(""" CREATE TABLE IF NOT EXISTS nodes ( node_id TEXT PRIMARY KEY, node_num INTEGER, long_name TEXT, short_name TEXT, hw_model TEXT, lat REAL, lon REAL, alt REAL, battery INTEGER, voltage REAL, snr REAL, rssi INTEGER, last_seen REAL, first_seen REAL, hop_count INTEGER, via_mqtt INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL, from_node TEXT, to_node TEXT, channel INTEGER, portnum TEXT, payload TEXT ); CREATE TABLE IF NOT EXISTS commands ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL, command TEXT, channel INTEGER ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT UNIQUE NOT NULL, password TEXT NOT NULL, name TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user', is_verified INTEGER NOT NULL DEFAULT 0, created_at REAL NOT NULL, updated_at REAL NOT NULL ); CREATE TABLE IF NOT EXISTS tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, token TEXT UNIQUE NOT NULL, type TEXT NOT NULL, expires_at REAL NOT NULL, used INTEGER NOT NULL DEFAULT 0, created_at REAL NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS packets ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL, from_id TEXT, to_id TEXT, portnum TEXT, channel INTEGER, snr REAL, rssi INTEGER, hop_limit INTEGER, hop_start INTEGER, packet_id INTEGER, payload TEXT ); CREATE TABLE IF NOT EXISTS email_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, recipient TEXT NOT NULL, subject TEXT NOT NULL, status TEXT NOT NULL, error_message TEXT, created_at REAL NOT NULL ); """) await self.db.commit() async def _migrate(self): async with self.db.execute("PRAGMA table_info(commands)") as c: cols = {row[1] async for row in c} if "channel" not in cols: await self.db.execute("ALTER TABLE commands ADD COLUMN channel INTEGER") await self.db.commit() logger.info("Migration: added channel column to commands table") # ── Node methods ────────────────────────────────── async def upsert_node(self, node_id: str, **kwargs) -> dict: now = time.time() existing = await self.get_node(node_id) if existing: updates = {k: v for k, v in kwargs.items() if v is not None} if not updates: return dict(existing) updates["last_seen"] = now set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [node_id] await self.db.execute( f"UPDATE nodes SET {set_clause} WHERE node_id = ?", values ) else: kwargs["node_id"] = node_id kwargs.setdefault("first_seen", now) kwargs["last_seen"] = now cols = ", ".join(kwargs.keys()) placeholders = ", ".join("?" for _ in kwargs) await self.db.execute( f"INSERT INTO nodes ({cols}) VALUES ({placeholders})", list(kwargs.values()), ) await self.db.commit() return dict(await self.get_node(node_id)) async def get_node(self, node_id: str) -> dict | None: async with self.db.execute( "SELECT * FROM nodes WHERE node_id = ?", (node_id,) ) as cursor: row = await cursor.fetchone() return dict(row) if row else None async def get_all_nodes(self) -> list[dict]: async with self.db.execute( "SELECT * FROM nodes ORDER BY last_seen DESC" ) as cursor: return [dict(row) async for row in cursor] async def get_nodes_with_position(self) -> list[dict]: async with self.db.execute( "SELECT * FROM nodes WHERE lat IS NOT NULL AND lon IS NOT NULL ORDER BY last_seen DESC" ) as cursor: return [dict(row) async for row in cursor] # ── Message methods ─────────────────────────────── async def insert_message(self, from_node: str, to_node: str, channel: int, portnum: str, payload: str) -> dict: now = time.time() cursor = await self.db.execute( "INSERT INTO messages (timestamp, from_node, to_node, channel, portnum, payload) VALUES (?, ?, ?, ?, ?, ?)", (now, from_node, to_node, channel, portnum, payload), ) await self.db.commit() async with self.db.execute( "SELECT * FROM messages WHERE id = ?", (cursor.lastrowid,) ) as c: row = await c.fetchone() return dict(row) if row else {} async def get_recent_messages(self, limit: int = 50) -> list[dict]: async with self.db.execute( "SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,) ) as cursor: return [dict(row) async for row in cursor] # ── Command methods ─────────────────────────────── async def insert_command(self, command: str, channel: int | None = None): now = time.time() await self.db.execute( "INSERT INTO commands (timestamp, command, channel) VALUES (?, ?, ?)", (now, command, channel), ) await self.db.commit() async def get_stats(self, my_node_id: str | None = None) -> dict: stats = {} async with self.db.execute("SELECT COUNT(*) FROM nodes") as c: stats["total_nodes"] = (await c.fetchone())[0] now = time.time() day_ago = now - 86400 async with self.db.execute( "SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (day_ago,) ) as c: stats["nodes_24h"] = (await c.fetchone())[0] threshold = config.get("web.online_threshold", 900) async with self.db.execute( "SELECT COUNT(*) FROM nodes WHERE last_seen >= ?", (now - threshold,) ) as c: stats["nodes_online"] = (await c.fetchone())[0] async with self.db.execute( "SELECT COUNT(*) FROM commands WHERE timestamp >= ?", (day_ago,) ) as c: stats["total_commands"] = (await c.fetchone())[0] async with self.db.execute( "SELECT channel, COUNT(*) as cnt FROM commands WHERE timestamp >= ? GROUP BY channel ORDER BY cnt DESC", (day_ago,), ) as cursor: stats["channel_breakdown"] = {row[0]: row[1] async for row in cursor} if my_node_id: pkt_query = ( "SELECT portnum, COUNT(*) as cnt FROM packets " "WHERE timestamp >= ? AND NOT (portnum = 'TELEMETRY_APP' AND from_id = ?) " "GROUP BY portnum ORDER BY cnt DESC" ) pkt_params = (day_ago, my_node_id) else: pkt_query = ( "SELECT portnum, COUNT(*) as cnt FROM packets " "WHERE timestamp >= ? GROUP BY portnum ORDER BY cnt DESC" ) pkt_params = (day_ago,) async with self.db.execute(pkt_query, pkt_params) as cursor: stats["packet_type_breakdown"] = {(row[0] or "?"): row[1] async for row in cursor} return stats # ── User methods ────────────────────────────────── async def create_user(self, email: str, password: str, name: str, role: str = "user", is_verified: int = 0) -> dict: now = time.time() cursor = await self.db.execute( "INSERT INTO users (email, password, name, role, is_verified, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)", (email, password, name, role, is_verified, now, now), ) await self.db.commit() return await self.get_user_by_id(cursor.lastrowid) async def get_user_by_email(self, email: str) -> dict | None: async with self.db.execute( "SELECT * FROM users WHERE email = ?", (email,) ) as cursor: row = await cursor.fetchone() return dict(row) if row else None async def get_user_by_id(self, user_id: int) -> dict | None: async with self.db.execute( "SELECT * FROM users WHERE id = ?", (user_id,) ) as cursor: row = await cursor.fetchone() return dict(row) if row else None async def get_all_users(self) -> list[dict]: async with self.db.execute( "SELECT id, email, name, role, is_verified, created_at, updated_at FROM users ORDER BY created_at DESC" ) as cursor: return [dict(row) async for row in cursor] async def update_user(self, user_id: int, **kwargs) -> dict | None: if not kwargs: return await self.get_user_by_id(user_id) kwargs["updated_at"] = time.time() set_clause = ", ".join(f"{k} = ?" for k in kwargs) values = list(kwargs.values()) + [user_id] await self.db.execute( f"UPDATE users SET {set_clause} WHERE id = ?", values ) await self.db.commit() return await self.get_user_by_id(user_id) async def delete_user(self, user_id: int) -> bool: cursor = await self.db.execute("DELETE FROM users WHERE id = ?", (user_id,)) await self.db.commit() return cursor.rowcount > 0 # ── Token methods ───────────────────────────────── async def create_token(self, user_id: int, token: str, token_type: str, expires_at: float) -> dict: now = time.time() cursor = await self.db.execute( "INSERT INTO tokens (user_id, token, type, expires_at, used, created_at) VALUES (?, ?, ?, ?, 0, ?)", (user_id, token, token_type, expires_at, now), ) await self.db.commit() async with self.db.execute("SELECT * FROM tokens WHERE id = ?", (cursor.lastrowid,)) as c: row = await c.fetchone() return dict(row) if row else {} async def get_valid_token(self, token: str, token_type: str) -> dict | None: now = time.time() async with self.db.execute( "SELECT * FROM tokens WHERE token = ? AND type = ? AND used = 0 AND expires_at > ?", (token, token_type, now), ) as cursor: row = await cursor.fetchone() return dict(row) if row else None async def mark_token_used(self, token_id: int): await self.db.execute("UPDATE tokens SET used = 1 WHERE id = ?", (token_id,)) await self.db.commit() # ── Email log methods ───────────────────────────── async def log_email(self, recipient: str, subject: str, status: str, error_message: str = None): now = time.time() await self.db.execute( "INSERT INTO email_logs (recipient, subject, status, error_message, created_at) VALUES (?, ?, ?, ?, ?)", (recipient, subject, status, error_message, now), ) await self.db.commit() # ── Packet log methods ──────────────────────────── async def insert_packet(self, from_id: str, to_id: str, portnum: str, channel: int, snr, rssi, hop_limit, hop_start, packet_id, payload: str) -> dict: now = time.time() cursor = await self.db.execute( "INSERT INTO packets (timestamp, from_id, to_id, portnum, channel, snr, rssi, " "hop_limit, hop_start, packet_id, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (now, from_id, to_id, portnum, channel, snr, rssi, hop_limit, hop_start, packet_id, payload), ) await self.db.commit() async with self.db.execute("SELECT * FROM packets WHERE id = ?", (cursor.lastrowid,)) as c: row = await c.fetchone() return dict(row) if row else {} async def get_recent_packets(self, limit: int = 200) -> list[dict]: async with self.db.execute( "SELECT * FROM packets ORDER BY timestamp DESC LIMIT ?", (limit,) ) as cursor: return [dict(row) async for row in cursor]