import asyncio import logging import os from datetime import datetime import yaml from meshbot import config logger = logging.getLogger(__name__) SCHEDULER_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config", "scheduler.yaml") class Scheduler: def __init__(self, bot, ws_manager): self.bot = bot self.ws_manager = ws_manager self.jobs: list[dict] = [] self._mtime: float = 0.0 self._load() def _load(self): try: with open(SCHEDULER_PATH, "r") as f: data = yaml.safe_load(f) or {} self.jobs = data.get("jobs", []) self._mtime = os.path.getmtime(SCHEDULER_PATH) logger.info("Scheduler loaded %d jobs from %s", len(self.jobs), SCHEDULER_PATH) except FileNotFoundError: logger.warning("Scheduler config not found: %s", SCHEDULER_PATH) self.jobs = [] except Exception: logger.exception("Error loading scheduler config") def _save(self): try: with open(SCHEDULER_PATH, "w") as f: yaml.dump({"jobs": self.jobs}, f, default_flow_style=False, allow_unicode=True, sort_keys=False) self._mtime = os.path.getmtime(SCHEDULER_PATH) logger.info("Scheduler config saved") except Exception: logger.exception("Error saving scheduler config") async def watch(self, interval: float = 2.0): while True: await asyncio.sleep(interval) try: current_mtime = os.path.getmtime(SCHEDULER_PATH) if current_mtime != self._mtime: self._load() if self.ws_manager: await self.ws_manager.broadcast("scheduler_update", self.jobs) except FileNotFoundError: pass except Exception: logger.exception("Error watching scheduler config") async def run(self): logger.info("Scheduler started") while True: now = datetime.now() # Sleep until next full minute sleep_seconds = 60 - now.second - now.microsecond / 1_000_000 await asyncio.sleep(sleep_seconds) now = datetime.now().replace(second=0, microsecond=0) for job in self.jobs: if not job.get("enabled", False): continue try: if self._matches_cron(job.get("cron", ""), now): logger.info("Executing scheduled job: %s", job.get("name")) await self._execute_job(job) except Exception: logger.exception("Error executing job %s", job.get("name")) async def _execute_job(self, job: dict): command = job.get("command", "") channel = job.get("channel", 0) job_type = job.get("type", "command") if command and self.bot: if job_type == "message": command = await self._resolve_vars(command, datetime.now()) await self.bot.send_message(command, channel) else: await self.bot.execute_command(command, channel) async def _resolve_vars(self, text: str, now: datetime) -> str: if "{" not in text: return text stats: dict = {} if self.bot and self.bot.db: try: stats = await self.bot.db.get_stats() except Exception: logger.exception("Error fetching stats for scheduler vars") weekdays = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"] replacements = { "{time}": now.strftime("%H:%M"), "{date}": now.strftime("%d.%m.%Y"), "{datetime}": now.strftime("%d.%m.%Y %H:%M"), "{weekday}": weekdays[now.weekday()], "{nodes}": str(stats.get("total_nodes", "?")), "{nodes_24h}": str(stats.get("nodes_24h", "?")), "{nodes_online}": str(stats.get("nodes_online", "?")), "{version}": config.get("version", "?"), } for key, val in replacements.items(): text = text.replace(key, val) return text @staticmethod def _matches_cron(cron_expr: str, dt: datetime) -> bool: parts = cron_expr.strip().split() if len(parts) != 5: return False values = [dt.minute, dt.hour, dt.day, dt.month, dt.isoweekday() % 7] ranges = [ (0, 59), # minute (0, 23), # hour (1, 31), # day (1, 12), # month (0, 6), # weekday (0=Sun) ] for field, current, (lo, hi) in zip(parts, values, ranges): if not Scheduler._field_matches(field, current, lo, hi): return False return True @staticmethod def _field_matches(field: str, value: int, lo: int, hi: int) -> bool: for part in field.split(","): if part == "*": return True if part.startswith("*/"): try: step = int(part[2:]) if step > 0 and value % step == 0: return True except ValueError: pass else: try: if int(part) == value: return True except ValueError: pass return False # CRUD operations def get_jobs(self) -> list[dict]: return self.jobs def add_job(self, job: dict) -> list[dict]: self.jobs.append(job) self._save() return self.jobs def update_job(self, name: str, updates: dict) -> list[dict] | None: for job in self.jobs: if job.get("name") == name: job.update(updates) self._save() return self.jobs return None def delete_job(self, name: str) -> list[dict] | None: for i, job in enumerate(self.jobs): if job.get("name") == name: self.jobs.pop(i) self._save() return self.jobs return None