MeshDD-Bot/meshbot/scheduler.py
ppfeiffer 33c05c0a32 feat(scheduler): Variablen-Badges theme-aware + neue Variablen nodes_online, version
- Badge-Styling: bg-secondary-subtle/text-secondary-emphasis statt bg-secondary
  → lesbar in Light- und Dark-Mode
- {nodes_online}: Nodes < 15 Min last_seen (DB-Abfrage in get_stats())
- {version}: Bot-Version aus config.yaml via config.get()
Closes #15.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 14:31:38 +01:00

176 lines
6.2 KiB
Python

import asyncio
import logging
import os
from datetime import datetime
import yaml
from meshbot import config
logger = logging.getLogger(__name__)
SCHEDULER_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "scheduler.yaml")
class Scheduler:
def __init__(self, bot, ws_manager):
self.bot = bot
self.ws_manager = ws_manager
self.jobs: list[dict] = []
self._mtime: float = 0.0
self._load()
def _load(self):
try:
with open(SCHEDULER_PATH, "r") as f:
data = yaml.safe_load(f) or {}
self.jobs = data.get("jobs", [])
self._mtime = os.path.getmtime(SCHEDULER_PATH)
logger.info("Scheduler loaded %d jobs from %s", len(self.jobs), SCHEDULER_PATH)
except FileNotFoundError:
logger.warning("Scheduler config not found: %s", SCHEDULER_PATH)
self.jobs = []
except Exception:
logger.exception("Error loading scheduler config")
def _save(self):
try:
with open(SCHEDULER_PATH, "w") as f:
yaml.dump({"jobs": self.jobs}, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
self._mtime = os.path.getmtime(SCHEDULER_PATH)
logger.info("Scheduler config saved")
except Exception:
logger.exception("Error saving scheduler config")
async def watch(self, interval: float = 2.0):
while True:
await asyncio.sleep(interval)
try:
current_mtime = os.path.getmtime(SCHEDULER_PATH)
if current_mtime != self._mtime:
self._load()
if self.ws_manager:
await self.ws_manager.broadcast("scheduler_update", self.jobs)
except FileNotFoundError:
pass
except Exception:
logger.exception("Error watching scheduler config")
async def run(self):
logger.info("Scheduler started")
while True:
now = datetime.now()
# Sleep until next full minute
sleep_seconds = 60 - now.second - now.microsecond / 1_000_000
await asyncio.sleep(sleep_seconds)
now = datetime.now().replace(second=0, microsecond=0)
for job in self.jobs:
if not job.get("enabled", False):
continue
try:
if self._matches_cron(job.get("cron", ""), now):
logger.info("Executing scheduled job: %s", job.get("name"))
await self._execute_job(job)
except Exception:
logger.exception("Error executing job %s", job.get("name"))
async def _execute_job(self, job: dict):
command = job.get("command", "")
channel = job.get("channel", 0)
job_type = job.get("type", "command")
if command and self.bot:
if job_type == "message":
command = await self._resolve_vars(command, datetime.now())
await self.bot.send_message(command, channel)
else:
await self.bot.execute_command(command, channel)
async def _resolve_vars(self, text: str, now: datetime) -> str:
if "{" not in text:
return text
stats: dict = {}
if self.bot and self.bot.db:
try:
stats = await self.bot.db.get_stats()
except Exception:
logger.exception("Error fetching stats for scheduler vars")
weekdays = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"]
replacements = {
"{time}": now.strftime("%H:%M"),
"{date}": now.strftime("%d.%m.%Y"),
"{datetime}": now.strftime("%d.%m.%Y %H:%M"),
"{weekday}": weekdays[now.weekday()],
"{nodes}": str(stats.get("total_nodes", "?")),
"{nodes_24h}": str(stats.get("nodes_24h", "?")),
"{nodes_online}": str(stats.get("nodes_online", "?")),
"{version}": config.get("version", "?"),
}
for key, val in replacements.items():
text = text.replace(key, val)
return text
@staticmethod
def _matches_cron(cron_expr: str, dt: datetime) -> bool:
parts = cron_expr.strip().split()
if len(parts) != 5:
return False
values = [dt.minute, dt.hour, dt.day, dt.month, dt.isoweekday() % 7]
ranges = [
(0, 59), # minute
(0, 23), # hour
(1, 31), # day
(1, 12), # month
(0, 6), # weekday (0=Sun)
]
for field, current, (lo, hi) in zip(parts, values, ranges):
if not Scheduler._field_matches(field, current, lo, hi):
return False
return True
@staticmethod
def _field_matches(field: str, value: int, lo: int, hi: int) -> bool:
for part in field.split(","):
if part == "*":
return True
if part.startswith("*/"):
try:
step = int(part[2:])
if step > 0 and value % step == 0:
return True
except ValueError:
pass
else:
try:
if int(part) == value:
return True
except ValueError:
pass
return False
# CRUD operations
def get_jobs(self) -> list[dict]:
return self.jobs
def add_job(self, job: dict) -> list[dict]:
self.jobs.append(job)
self._save()
return self.jobs
def update_job(self, name: str, updates: dict) -> list[dict] | None:
for job in self.jobs:
if job.get("name") == name:
job.update(updates)
self._save()
return self.jobs
return None
def delete_job(self, name: str) -> list[dict] | None:
for i, job in enumerate(self.jobs):
if job.get("name") == name:
self.jobs.pop(i)
self._save()
return self.jobs
return None