Add message sending from web dashboard with channel selector, new POST /api/send endpoint, scheduler support for free-text messages (type field), and modernized dashboard layout with glassmorphism navbar, gradient stat cards, chat bubbles. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
149 lines
5 KiB
Python
149 lines
5 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
|
|
import yaml
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SCHEDULER_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "scheduler.yaml")
|
|
|
|
|
|
class Scheduler:
|
|
def __init__(self, bot, ws_manager):
|
|
self.bot = bot
|
|
self.ws_manager = ws_manager
|
|
self.jobs: list[dict] = []
|
|
self._mtime: float = 0.0
|
|
self._load()
|
|
|
|
def _load(self):
|
|
try:
|
|
with open(SCHEDULER_PATH, "r") as f:
|
|
data = yaml.safe_load(f) or {}
|
|
self.jobs = data.get("jobs", [])
|
|
self._mtime = os.path.getmtime(SCHEDULER_PATH)
|
|
logger.info("Scheduler loaded %d jobs from %s", len(self.jobs), SCHEDULER_PATH)
|
|
except FileNotFoundError:
|
|
logger.warning("Scheduler config not found: %s", SCHEDULER_PATH)
|
|
self.jobs = []
|
|
except Exception:
|
|
logger.exception("Error loading scheduler config")
|
|
|
|
def _save(self):
|
|
try:
|
|
with open(SCHEDULER_PATH, "w") as f:
|
|
yaml.dump({"jobs": self.jobs}, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
self._mtime = os.path.getmtime(SCHEDULER_PATH)
|
|
logger.info("Scheduler config saved")
|
|
except Exception:
|
|
logger.exception("Error saving scheduler config")
|
|
|
|
async def watch(self, interval: float = 2.0):
|
|
while True:
|
|
await asyncio.sleep(interval)
|
|
try:
|
|
current_mtime = os.path.getmtime(SCHEDULER_PATH)
|
|
if current_mtime != self._mtime:
|
|
self._load()
|
|
if self.ws_manager:
|
|
await self.ws_manager.broadcast("scheduler_update", self.jobs)
|
|
except FileNotFoundError:
|
|
pass
|
|
except Exception:
|
|
logger.exception("Error watching scheduler config")
|
|
|
|
async def run(self):
|
|
logger.info("Scheduler started")
|
|
while True:
|
|
now = datetime.now()
|
|
# Sleep until next full minute
|
|
sleep_seconds = 60 - now.second - now.microsecond / 1_000_000
|
|
await asyncio.sleep(sleep_seconds)
|
|
|
|
now = datetime.now().replace(second=0, microsecond=0)
|
|
for job in self.jobs:
|
|
if not job.get("enabled", False):
|
|
continue
|
|
try:
|
|
if self._matches_cron(job.get("cron", ""), now):
|
|
logger.info("Executing scheduled job: %s", job.get("name"))
|
|
await self._execute_job(job)
|
|
except Exception:
|
|
logger.exception("Error executing job %s", job.get("name"))
|
|
|
|
async def _execute_job(self, job: dict):
|
|
command = job.get("command", "")
|
|
channel = job.get("channel", 0)
|
|
job_type = job.get("type", "command")
|
|
if command and self.bot:
|
|
if job_type == "message":
|
|
await self.bot.send_message(command, channel)
|
|
else:
|
|
await self.bot.execute_command(command, channel)
|
|
|
|
@staticmethod
|
|
def _matches_cron(cron_expr: str, dt: datetime) -> bool:
|
|
parts = cron_expr.strip().split()
|
|
if len(parts) != 5:
|
|
return False
|
|
values = [dt.minute, dt.hour, dt.day, dt.month, dt.isoweekday() % 7]
|
|
ranges = [
|
|
(0, 59), # minute
|
|
(0, 23), # hour
|
|
(1, 31), # day
|
|
(1, 12), # month
|
|
(0, 6), # weekday (0=Sun)
|
|
]
|
|
for field, current, (lo, hi) in zip(parts, values, ranges):
|
|
if not Scheduler._field_matches(field, current, lo, hi):
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def _field_matches(field: str, value: int, lo: int, hi: int) -> bool:
|
|
for part in field.split(","):
|
|
if part == "*":
|
|
return True
|
|
if part.startswith("*/"):
|
|
try:
|
|
step = int(part[2:])
|
|
if step > 0 and value % step == 0:
|
|
return True
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
try:
|
|
if int(part) == value:
|
|
return True
|
|
except ValueError:
|
|
pass
|
|
return False
|
|
|
|
# CRUD operations
|
|
|
|
def get_jobs(self) -> list[dict]:
|
|
return self.jobs
|
|
|
|
def add_job(self, job: dict) -> list[dict]:
|
|
self.jobs.append(job)
|
|
self._save()
|
|
return self.jobs
|
|
|
|
def update_job(self, name: str, updates: dict) -> list[dict] | None:
|
|
for job in self.jobs:
|
|
if job.get("name") == name:
|
|
job.update(updates)
|
|
self._save()
|
|
return self.jobs
|
|
return None
|
|
|
|
def delete_job(self, name: str) -> list[dict] | None:
|
|
for i, job in enumerate(self.jobs):
|
|
if job.get("name") == name:
|
|
self.jobs.pop(i)
|
|
self._save()
|
|
return self.jobs
|
|
return None
|