import base64 import logging import time import uuid import bcrypt from aiohttp import web import aiohttp_session from aiohttp_session.cookie_storage import EncryptedCookieStorage from cryptography.fernet import Fernet from meshbot import config logger = logging.getLogger(__name__) # ── Password hashing ───────────────────────────────── def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=12)).decode("utf-8") def check_password(password: str, hashed: str) -> bool: return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8")) # ── Session setup ──────────────────────────────────── def setup_session(app: web.Application): secret_key = config.get("auth.secret_key", "change-this-secret-key-32bytes!!") # EncryptedCookieStorage accepts a Fernet object directly key_bytes = secret_key.encode("utf-8")[:32].ljust(32, b"\0") fernet_key = Fernet(base64.urlsafe_b64encode(key_bytes)) max_age = config.get("auth.session_max_age", 86400) storage = EncryptedCookieStorage( fernet_key, cookie_name="meshdd_session", max_age=max_age, ) aiohttp_session.setup(app, storage) # ── Auth middleware ─────────────────────────────────── @web.middleware async def auth_middleware(request: web.Request, handler): request["user"] = None try: session = await aiohttp_session.get_session(request) if session.get("user_id"): request["user"] = { "id": session["user_id"], "email": session.get("email"), "name": session.get("name"), "role": session.get("role"), } except Exception: pass return await handler(request) # ── Helper checks ──────────────────────────────────── def require_user(request: web.Request): if not request["user"]: raise web.HTTPFound("/login") def require_admin(request: web.Request): if not request["user"]: raise web.HTTPFound("/login") if request["user"]["role"] != "admin": raise web.HTTPForbidden(text="Admin access required") def require_user_api(request: web.Request): if not request["user"]: raise web.HTTPUnauthorized(text="Login required") def require_staff_api(request: web.Request): if not request["user"]: raise web.HTTPUnauthorized(text="Login required") if request["user"]["role"] not in ("mitarbeiter", "admin"): raise web.HTTPForbidden(text="Staff access required") def require_admin_api(request: web.Request): if not request["user"]: raise web.HTTPUnauthorized(text="Login required") if request["user"]["role"] != "admin": raise web.HTTPForbidden(text="Admin access required") # ── Email sending ──────────────────────────────────── async def _send_email(db, recipient: str, subject: str, html_body: str): smtp_host = config.get("smtp.host", "") if not smtp_host: logger.info("SMTP not configured - email to %s not sent", recipient) await db.log_email(recipient, subject, "console", "SMTP not configured") return try: import aiosmtplib from email.message import EmailMessage msg = EmailMessage() msg["Subject"] = subject msg["From"] = config.get("smtp.from", "MeshDD-Bot ") msg["To"] = recipient msg.set_content("Bitte HTML-fähigen E-Mail-Client verwenden.") msg.add_alternative(html_body, subtype="html") smtp_client = aiosmtplib.SMTP( hostname=smtp_host, port=config.get("smtp.port", 465), use_tls=True, ) async with smtp_client: await smtp_client.login( config.get("smtp.user", ""), config.get("smtp.password", ""), ) await smtp_client.send_message(msg) await db.log_email(recipient, subject, "sent") logger.info("Email sent to %s: %s", recipient, subject) except Exception as e: logger.error("Failed to send email to %s: %s", recipient, e) await db.log_email(recipient, subject, "error", str(e)) async def send_verification_email(db, email: str, token: str): app_url = config.get("smtp.app_url", "http://localhost:8081") verify_url = f"{app_url}/auth/verify?token={token}" logger.info("Verification link for %s: %s", email, verify_url) subject = "MeshDD-Bot - E-Mail verifizieren" html_body = f"""

MeshDD-Bot Registrierung

Klicke auf den folgenden Link, um dein Passwort zu setzen und dein Konto zu aktivieren:

{verify_url}

Der Link ist 24 Stunden gueltig.

""" await _send_email(db, email, subject, html_body) return verify_url async def send_reset_email(db, email: str, token: str): app_url = config.get("smtp.app_url", "http://localhost:8081") reset_url = f"{app_url}/auth/reset-password?token={token}" logger.info("Reset link for %s: %s", email, reset_url) subject = "MeshDD-Bot - Passwort zuruecksetzen" html_body = f"""

Passwort zuruecksetzen

Klicke auf den folgenden Link, um dein Passwort zurueckzusetzen:

{reset_url}

Der Link ist 24 Stunden gueltig.

""" await _send_email(db, email, subject, html_body) return reset_url async def send_invite_email(db, email: str, name: str, password: str): app_url = config.get("smtp.app_url", "http://localhost:8081") login_url = f"{app_url}/login" logger.info("Invite for %s: login at %s", email, login_url) subject = "MeshDD-Dashboard - Einladung als Mitarbeiter" html_body = f"""

Willkommen bei MeshDD-Dashboard!

Hallo {name},

du wurdest als Mitarbeiter eingeladen. Hier sind deine Zugangsdaten:

E-Mail: {email}

Passwort: {password}

Anmelden unter: {login_url}

Bitte aendere dein Passwort nach dem ersten Login.

""" await _send_email(db, email, subject, html_body) async def send_user_info_email(db, email: str, name: str, password: str = None): app_url = config.get("smtp.app_url", "http://localhost:8081") login_url = f"{app_url}/login" subject = "MeshDD-Bot - Deine Zugangsdaten" pw_line = f"

Passwort: {password}

" if password else "

Dein Passwort wurde nicht geaendert.

" html_body = f"""

MeshDD-Bot Zugangsdaten

Hallo {name},

hier sind deine Zugangsdaten fuer MeshDD-Bot:

E-Mail: {email}

{pw_line}

Anmelden unter: {login_url}

Bitte aendere dein Passwort nach dem ersten Login.

""" await _send_email(db, email, subject, html_body) # ── Auth route handlers ────────────────────────────── def setup_auth_routes(app: web.Application, db): async def login_post(request: web.Request) -> web.Response: data = await request.json() email = data.get("email", "").strip().lower() password = data.get("password", "") if not email or not password: return web.json_response({"error": "E-Mail und Passwort erforderlich"}, status=400) user = await db.get_user_by_email(email) if not user or not check_password(password, user["password"]): return web.json_response({"error": "Ungueltige Anmeldedaten"}, status=401) if not user["is_verified"]: return web.json_response({"error": "Konto nicht verifiziert. Bitte pruefe deine E-Mails."}, status=403) session = await aiohttp_session.new_session(request) session["user_id"] = user["id"] session["email"] = user["email"] session["name"] = user["name"] session["role"] = user["role"] return web.json_response({ "id": user["id"], "email": user["email"], "name": user["name"], "role": user["role"], "force_password_change": bool(user.get("must_change_password", 0)), }) async def register_post(request: web.Request) -> web.Response: data = await request.json() name = data.get("name", "").strip() email = data.get("email", "").strip().lower() if not name or not email: return web.json_response({"error": "Name und E-Mail erforderlich"}, status=400) existing = await db.get_user_by_email(email) if existing: return web.json_response({"error": "E-Mail bereits registriert"}, status=409) # Create user with temporary password hash (will be set during verification) temp_hash = hash_password(uuid.uuid4().hex) user = await db.create_user(email=email, password=temp_hash, name=name) # Generate verification token token = uuid.uuid4().hex expires_at = time.time() + 86400 # 24h await db.create_token(user["id"], token, "verify", expires_at) # Send email await send_verification_email(db, email, token) return web.json_response({"ok": True, "message": "Registrierung erfolgreich. Pruefe deine E-Mails."}) async def logout_get(request: web.Request) -> web.Response: session = await aiohttp_session.get_session(request) session.invalidate() raise web.HTTPFound("/") async def verify_get(request: web.Request) -> web.Response: token = request.query.get("token", "") if not token: return web.Response(text="Token fehlt", status=400) token_row = await db.get_valid_token(token, "verify") if not token_row: return web.Response(text="Token ungueltig oder abgelaufen", status=400) # Serve the set-password form import os static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static") return web.FileResponse(os.path.join(static_dir, "login.html")) async def set_password_post(request: web.Request) -> web.Response: data = await request.json() token = data.get("token", "") password = data.get("password", "") if not token or not password: return web.json_response({"error": "Token und Passwort erforderlich"}, status=400) if len(password) < 8: return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400) token_row = await db.get_valid_token(token, "verify") if not token_row: return web.json_response({"error": "Token ungueltig oder abgelaufen"}, status=400) hashed = hash_password(password) await db.update_user(token_row["user_id"], password=hashed, is_verified=1) await db.mark_token_used(token_row["id"]) return web.json_response({"ok": True, "message": "Passwort gesetzt. Du kannst dich jetzt anmelden."}) async def forgot_password_post(request: web.Request) -> web.Response: data = await request.json() email = data.get("email", "").strip().lower() if not email: return web.json_response({"error": "E-Mail erforderlich"}, status=400) user = await db.get_user_by_email(email) if not user: # Don't reveal whether email exists return web.json_response({"ok": True, "message": "Falls die E-Mail registriert ist, wurde ein Link gesendet."}) token = uuid.uuid4().hex expires_at = time.time() + 86400 # 24h await db.create_token(user["id"], token, "reset", expires_at) await send_reset_email(db, email, token) return web.json_response({"ok": True, "message": "Falls die E-Mail registriert ist, wurde ein Link gesendet."}) async def reset_password_get(request: web.Request) -> web.Response: token = request.query.get("token", "") if not token: return web.Response(text="Token fehlt", status=400) token_row = await db.get_valid_token(token, "reset") if not token_row: return web.Response(text="Token ungueltig oder abgelaufen", status=400) import os static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static") return web.FileResponse(os.path.join(static_dir, "login.html")) async def reset_password_post(request: web.Request) -> web.Response: data = await request.json() token = data.get("token", "") password = data.get("password", "") if not token or not password: return web.json_response({"error": "Token und Passwort erforderlich"}, status=400) if len(password) < 8: return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400) token_row = await db.get_valid_token(token, "reset") if not token_row: return web.json_response({"error": "Token ungueltig oder abgelaufen"}, status=400) hashed = hash_password(password) await db.update_user(token_row["user_id"], password=hashed) await db.mark_token_used(token_row["id"]) return web.json_response({"ok": True, "message": "Passwort geaendert. Du kannst dich jetzt anmelden."}) async def me_get(request: web.Request) -> web.Response: user = request["user"] if not user: return web.json_response(None, status=401) return web.json_response(user) # ── Admin API ──────────────────────────────────── async def admin_users_get(request: web.Request) -> web.Response: require_admin_api(request) users = await db.get_all_users() return web.json_response(users) async def admin_user_role(request: web.Request) -> web.Response: require_admin_api(request) user_id = int(request.match_info["id"]) data = await request.json() role = data.get("role", "") if role not in ("mitarbeiter", "admin"): return web.json_response({"error": "Ungueltige Rolle"}, status=400) await db.update_user(user_id, role=role) users = await db.get_all_users() return web.json_response(users) async def admin_user_verify(request: web.Request) -> web.Response: require_admin_api(request) user_id = int(request.match_info["id"]) await db.update_user(user_id, is_verified=1) users = await db.get_all_users() return web.json_response(users) async def admin_user_create(request: web.Request) -> web.Response: require_admin_api(request) data = await request.json() name = data.get("name", "").strip() email = data.get("email", "").strip().lower() password = data.get("password", "") role = data.get("role", "mitarbeiter") if not name or not email or not password: return web.json_response({"error": "Name, E-Mail und Passwort erforderlich"}, status=400) if len(password) < 8: return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400) if role not in ("mitarbeiter", "admin"): return web.json_response({"error": "Ungueltige Rolle"}, status=400) existing = await db.get_user_by_email(email) if existing: return web.json_response({"error": "E-Mail bereits registriert"}, status=409) hashed = hash_password(password) await db.create_user(email=email, password=hashed, name=name, role=role, is_verified=1) # Send info email if requested if data.get("send_email"): await send_user_info_email(db, email, name, password) users = await db.get_all_users() return web.json_response(users) async def admin_user_update(request: web.Request) -> web.Response: require_admin_api(request) user_id = int(request.match_info["id"]) data = await request.json() updates = {} if "name" in data and data["name"].strip(): updates["name"] = data["name"].strip() if "email" in data and data["email"].strip(): new_email = data["email"].strip().lower() existing = await db.get_user_by_email(new_email) if existing and existing["id"] != user_id: return web.json_response({"error": "E-Mail bereits vergeben"}, status=409) updates["email"] = new_email if "role" in data and data["role"] in ("mitarbeiter", "admin"): updates["role"] = data["role"] if not updates: return web.json_response({"error": "Keine Aenderungen"}, status=400) await db.update_user(user_id, **updates) users = await db.get_all_users() return web.json_response(users) async def admin_user_reset_password(request: web.Request) -> web.Response: require_admin_api(request) user_id = int(request.match_info["id"]) data = await request.json() password = data.get("password", "") if not password or len(password) < 8: return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400) hashed = hash_password(password) await db.update_user(user_id, password=hashed) # Send info email if requested if data.get("send_email"): user = await db.get_user_by_id(user_id) if user: await send_user_info_email(db, user["email"], user["name"], password) users = await db.get_all_users() return web.json_response(users) async def admin_user_send_info(request: web.Request) -> web.Response: require_admin_api(request) user_id = int(request.match_info["id"]) user = await db.get_user_by_id(user_id) if not user: return web.json_response({"error": "Benutzer nicht gefunden"}, status=404) await send_user_info_email(db, user["email"], user["name"]) return web.json_response({"ok": True, "message": f"Info-Mail an {user['email']} gesendet"}) async def admin_user_delete(request: web.Request) -> web.Response: require_admin_api(request) user_id = int(request.match_info["id"]) # Prevent self-deletion if request["user"]["id"] == user_id: return web.json_response({"error": "Eigenen Account kann man nicht loeschen"}, status=400) await db.delete_user(user_id) users = await db.get_all_users() return web.json_response(users) async def admin_invite(request: web.Request) -> web.Response: require_admin_api(request) import secrets import string data = await request.json() name = data.get("name", "").strip() email = data.get("email", "").strip().lower() if not name or not email: return web.json_response({"error": "Name und E-Mail erforderlich"}, status=400) existing = await db.get_user_by_email(email) if existing: return web.json_response({"error": "E-Mail bereits registriert"}, status=409) alphabet = string.ascii_letters + string.digits + "!@#$%" password = "".join(secrets.choice(alphabet) for _ in range(12)) hashed = hash_password(password) await db.create_user( email=email, password=hashed, name=name, role="mitarbeiter", is_verified=1, must_change_password=1, ) await send_invite_email(db, email, name, password) users = await db.get_all_users() return web.json_response(users, status=201) async def change_password_get(request: web.Request) -> web.Response: import os static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static") return web.FileResponse(os.path.join(static_dir, "change-password.html")) async def change_password_post(request: web.Request) -> web.Response: user = request["user"] if not user: return web.json_response({"error": "Login erforderlich"}, status=401) data = await request.json() password = data.get("password", "") if len(password) < 8: return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400) hashed = hash_password(password) await db.update_user(user["id"], password=hashed, must_change_password=0) return web.json_response({"ok": True, "message": "Passwort geaendert"}) # Register routes app.router.add_post("/auth/login", login_post) app.router.add_post("/auth/register", register_post) app.router.add_get("/auth/logout", logout_get) app.router.add_get("/auth/verify", verify_get) app.router.add_post("/auth/set-password", set_password_post) app.router.add_post("/auth/forgot-password", forgot_password_post) app.router.add_get("/auth/reset-password", reset_password_get) app.router.add_post("/auth/reset-password", reset_password_post) app.router.add_get("/api/auth/me", me_get) app.router.add_get("/api/admin/users", admin_users_get) app.router.add_post("/api/admin/users", admin_user_create) app.router.add_post("/api/admin/invite", admin_invite) app.router.add_put("/api/admin/users/{id}", admin_user_update) app.router.add_post("/api/admin/users/{id}/role", admin_user_role) app.router.add_post("/api/admin/users/{id}/verify", admin_user_verify) app.router.add_post("/api/admin/users/{id}/reset-password", admin_user_reset_password) app.router.add_post("/api/admin/users/{id}/send-info", admin_user_send_info) app.router.add_delete("/api/admin/users/{id}", admin_user_delete) app.router.add_get("/auth/change-password", change_password_get) app.router.add_post("/auth/change-password", change_password_post)