MeshDD-Bot/meshbot/auth.py
ppfeiffer 5b2a5867d5 fix: v0.5.5 - SMTP auf EmailMessage + async SMTP-Client umgestellt
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 15:54:40 +01:00

470 lines
19 KiB
Python

import base64
import logging
import time
import uuid
import bcrypt
from aiohttp import web
import aiohttp_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from cryptography.fernet import Fernet
from meshbot import config
logger = logging.getLogger(__name__)
# ── Password hashing ─────────────────────────────────
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=12)).decode("utf-8")
def check_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
# ── Session setup ────────────────────────────────────
def setup_session(app: web.Application):
secret_key = config.env("AUTH_SECRET_KEY", "change-this-secret-key-32bytes!!")
# EncryptedCookieStorage accepts a Fernet object directly
key_bytes = secret_key.encode("utf-8")[:32].ljust(32, b"\0")
fernet_key = Fernet(base64.urlsafe_b64encode(key_bytes))
max_age = config.get("auth.session_max_age", 86400)
storage = EncryptedCookieStorage(
fernet_key,
cookie_name="meshdd_session",
max_age=max_age,
)
aiohttp_session.setup(app, storage)
# ── Auth middleware ───────────────────────────────────
@web.middleware
async def auth_middleware(request: web.Request, handler):
request["user"] = None
try:
session = await aiohttp_session.get_session(request)
if session.get("user_id"):
request["user"] = {
"id": session["user_id"],
"email": session.get("email"),
"name": session.get("name"),
"role": session.get("role"),
}
except Exception:
pass
return await handler(request)
# ── Helper checks ────────────────────────────────────
def require_user(request: web.Request):
if not request["user"]:
raise web.HTTPFound("/login")
def require_admin(request: web.Request):
if not request["user"]:
raise web.HTTPFound("/login")
if request["user"]["role"] != "admin":
raise web.HTTPForbidden(text="Admin access required")
def require_user_api(request: web.Request):
if not request["user"]:
raise web.HTTPUnauthorized(text="Login required")
def require_admin_api(request: web.Request):
if not request["user"]:
raise web.HTTPUnauthorized(text="Login required")
if request["user"]["role"] != "admin":
raise web.HTTPForbidden(text="Admin access required")
# ── Email sending ────────────────────────────────────
async def _send_email(db, recipient: str, subject: str, html_body: str):
smtp_host = config.env("SMTP_HOST")
if not smtp_host:
logger.info("SMTP not configured - email to %s not sent", recipient)
await db.log_email(recipient, subject, "console", "SMTP not configured")
return
try:
import aiosmtplib
from email.message import EmailMessage
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = config.env("SMTP_FROM", "MeshDD-Bot <noreply@example.com>")
msg["To"] = recipient
msg.set_content("Bitte HTML-fähigen E-Mail-Client verwenden.")
msg.add_alternative(html_body, subtype="html")
smtp_port = int(config.env("SMTP_PORT", "465"))
smtp_client = aiosmtplib.SMTP(
hostname=smtp_host,
port=smtp_port,
use_tls=True,
)
async with smtp_client:
await smtp_client.login(
config.env("SMTP_USER"),
config.env("SMTP_PASSWORD"),
)
await smtp_client.send_message(msg)
await db.log_email(recipient, subject, "sent")
logger.info("Email sent to %s: %s", recipient, subject)
except Exception as e:
logger.error("Failed to send email to %s: %s", recipient, e)
await db.log_email(recipient, subject, "error", str(e))
async def send_verification_email(db, email: str, token: str):
app_url = config.env("SMTP_APP_URL", "http://localhost:8080")
verify_url = f"{app_url}/auth/verify?token={token}"
logger.info("Verification link for %s: %s", email, verify_url)
subject = "MeshDD-Bot - E-Mail verifizieren"
html_body = f"""<html><body>
<h2>MeshDD-Bot Registrierung</h2>
<p>Klicke auf den folgenden Link, um dein Passwort zu setzen und dein Konto zu aktivieren:</p>
<p><a href="{verify_url}">{verify_url}</a></p>
<p>Der Link ist 24 Stunden gueltig.</p>
</body></html>"""
await _send_email(db, email, subject, html_body)
return verify_url
async def send_reset_email(db, email: str, token: str):
app_url = config.env("SMTP_APP_URL", "http://localhost:8080")
reset_url = f"{app_url}/auth/reset-password?token={token}"
logger.info("Reset link for %s: %s", email, reset_url)
subject = "MeshDD-Bot - Passwort zuruecksetzen"
html_body = f"""<html><body>
<h2>Passwort zuruecksetzen</h2>
<p>Klicke auf den folgenden Link, um dein Passwort zurueckzusetzen:</p>
<p><a href="{reset_url}">{reset_url}</a></p>
<p>Der Link ist 24 Stunden gueltig.</p>
</body></html>"""
await _send_email(db, email, subject, html_body)
return reset_url
async def send_user_info_email(db, email: str, name: str, password: str = None):
app_url = config.env("SMTP_APP_URL", "http://localhost:8080")
login_url = f"{app_url}/login"
subject = "MeshDD-Bot - Deine Zugangsdaten"
pw_line = f"<p><strong>Passwort:</strong> {password}</p>" if password else "<p>Dein Passwort wurde nicht geaendert.</p>"
html_body = f"""<html><body>
<h2>MeshDD-Bot Zugangsdaten</h2>
<p>Hallo {name},</p>
<p>hier sind deine Zugangsdaten fuer MeshDD-Bot:</p>
<p><strong>E-Mail:</strong> {email}</p>
{pw_line}
<p>Anmelden unter: <a href="{login_url}">{login_url}</a></p>
<p>Bitte aendere dein Passwort nach dem ersten Login.</p>
</body></html>"""
await _send_email(db, email, subject, html_body)
# ── Auth route handlers ──────────────────────────────
def setup_auth_routes(app: web.Application, db):
async def login_post(request: web.Request) -> web.Response:
data = await request.json()
email = data.get("email", "").strip().lower()
password = data.get("password", "")
if not email or not password:
return web.json_response({"error": "E-Mail und Passwort erforderlich"}, status=400)
user = await db.get_user_by_email(email)
if not user or not check_password(password, user["password"]):
return web.json_response({"error": "Ungueltige Anmeldedaten"}, status=401)
if not user["is_verified"]:
return web.json_response({"error": "Konto nicht verifiziert. Bitte pruefe deine E-Mails."}, status=403)
session = await aiohttp_session.new_session(request)
session["user_id"] = user["id"]
session["email"] = user["email"]
session["name"] = user["name"]
session["role"] = user["role"]
return web.json_response({
"id": user["id"],
"email": user["email"],
"name": user["name"],
"role": user["role"],
})
async def register_post(request: web.Request) -> web.Response:
data = await request.json()
name = data.get("name", "").strip()
email = data.get("email", "").strip().lower()
if not name or not email:
return web.json_response({"error": "Name und E-Mail erforderlich"}, status=400)
existing = await db.get_user_by_email(email)
if existing:
return web.json_response({"error": "E-Mail bereits registriert"}, status=409)
# Create user with temporary password hash (will be set during verification)
temp_hash = hash_password(uuid.uuid4().hex)
user = await db.create_user(email=email, password=temp_hash, name=name)
# Generate verification token
token = uuid.uuid4().hex
expires_at = time.time() + 86400 # 24h
await db.create_token(user["id"], token, "verify", expires_at)
# Send email
await send_verification_email(db, email, token)
return web.json_response({"ok": True, "message": "Registrierung erfolgreich. Pruefe deine E-Mails."})
async def logout_get(request: web.Request) -> web.Response:
session = await aiohttp_session.get_session(request)
session.invalidate()
raise web.HTTPFound("/")
async def verify_get(request: web.Request) -> web.Response:
token = request.query.get("token", "")
if not token:
return web.Response(text="Token fehlt", status=400)
token_row = await db.get_valid_token(token, "verify")
if not token_row:
return web.Response(text="Token ungueltig oder abgelaufen", status=400)
# Serve the set-password form
import os
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
return web.FileResponse(os.path.join(static_dir, "login.html"))
async def set_password_post(request: web.Request) -> web.Response:
data = await request.json()
token = data.get("token", "")
password = data.get("password", "")
if not token or not password:
return web.json_response({"error": "Token und Passwort erforderlich"}, status=400)
if len(password) < 8:
return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400)
token_row = await db.get_valid_token(token, "verify")
if not token_row:
return web.json_response({"error": "Token ungueltig oder abgelaufen"}, status=400)
hashed = hash_password(password)
await db.update_user(token_row["user_id"], password=hashed, is_verified=1)
await db.mark_token_used(token_row["id"])
return web.json_response({"ok": True, "message": "Passwort gesetzt. Du kannst dich jetzt anmelden."})
async def forgot_password_post(request: web.Request) -> web.Response:
data = await request.json()
email = data.get("email", "").strip().lower()
if not email:
return web.json_response({"error": "E-Mail erforderlich"}, status=400)
user = await db.get_user_by_email(email)
if not user:
# Don't reveal whether email exists
return web.json_response({"ok": True, "message": "Falls die E-Mail registriert ist, wurde ein Link gesendet."})
token = uuid.uuid4().hex
expires_at = time.time() + 86400 # 24h
await db.create_token(user["id"], token, "reset", expires_at)
await send_reset_email(db, email, token)
return web.json_response({"ok": True, "message": "Falls die E-Mail registriert ist, wurde ein Link gesendet."})
async def reset_password_get(request: web.Request) -> web.Response:
token = request.query.get("token", "")
if not token:
return web.Response(text="Token fehlt", status=400)
token_row = await db.get_valid_token(token, "reset")
if not token_row:
return web.Response(text="Token ungueltig oder abgelaufen", status=400)
import os
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
return web.FileResponse(os.path.join(static_dir, "login.html"))
async def reset_password_post(request: web.Request) -> web.Response:
data = await request.json()
token = data.get("token", "")
password = data.get("password", "")
if not token or not password:
return web.json_response({"error": "Token und Passwort erforderlich"}, status=400)
if len(password) < 8:
return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400)
token_row = await db.get_valid_token(token, "reset")
if not token_row:
return web.json_response({"error": "Token ungueltig oder abgelaufen"}, status=400)
hashed = hash_password(password)
await db.update_user(token_row["user_id"], password=hashed)
await db.mark_token_used(token_row["id"])
return web.json_response({"ok": True, "message": "Passwort geaendert. Du kannst dich jetzt anmelden."})
async def me_get(request: web.Request) -> web.Response:
user = request["user"]
if not user:
return web.json_response(None, status=401)
return web.json_response(user)
# ── Admin API ────────────────────────────────────
async def admin_users_get(request: web.Request) -> web.Response:
require_admin_api(request)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_role(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
data = await request.json()
role = data.get("role", "")
if role not in ("user", "admin"):
return web.json_response({"error": "Ungueltige Rolle"}, status=400)
await db.update_user(user_id, role=role)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_verify(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
await db.update_user(user_id, is_verified=1)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_create(request: web.Request) -> web.Response:
require_admin_api(request)
data = await request.json()
name = data.get("name", "").strip()
email = data.get("email", "").strip().lower()
password = data.get("password", "")
role = data.get("role", "user")
if not name or not email or not password:
return web.json_response({"error": "Name, E-Mail und Passwort erforderlich"}, status=400)
if len(password) < 8:
return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400)
if role not in ("user", "admin"):
return web.json_response({"error": "Ungueltige Rolle"}, status=400)
existing = await db.get_user_by_email(email)
if existing:
return web.json_response({"error": "E-Mail bereits registriert"}, status=409)
hashed = hash_password(password)
await db.create_user(email=email, password=hashed, name=name, role=role, is_verified=1)
# Send info email if requested
if data.get("send_email"):
await send_user_info_email(db, email, name, password)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_update(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
data = await request.json()
updates = {}
if "name" in data and data["name"].strip():
updates["name"] = data["name"].strip()
if "email" in data and data["email"].strip():
new_email = data["email"].strip().lower()
existing = await db.get_user_by_email(new_email)
if existing and existing["id"] != user_id:
return web.json_response({"error": "E-Mail bereits vergeben"}, status=409)
updates["email"] = new_email
if "role" in data and data["role"] in ("user", "admin"):
updates["role"] = data["role"]
if not updates:
return web.json_response({"error": "Keine Aenderungen"}, status=400)
await db.update_user(user_id, **updates)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_reset_password(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
data = await request.json()
password = data.get("password", "")
if not password or len(password) < 8:
return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400)
hashed = hash_password(password)
await db.update_user(user_id, password=hashed)
# Send info email if requested
if data.get("send_email"):
user = await db.get_user_by_id(user_id)
if user:
await send_user_info_email(db, user["email"], user["name"], password)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_send_info(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
user = await db.get_user_by_id(user_id)
if not user:
return web.json_response({"error": "Benutzer nicht gefunden"}, status=404)
await send_user_info_email(db, user["email"], user["name"])
return web.json_response({"ok": True, "message": f"Info-Mail an {user['email']} gesendet"})
async def admin_user_delete(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
# Prevent self-deletion
if request["user"]["id"] == user_id:
return web.json_response({"error": "Eigenen Account kann man nicht loeschen"}, status=400)
await db.delete_user(user_id)
users = await db.get_all_users()
return web.json_response(users)
# Register routes
app.router.add_post("/auth/login", login_post)
app.router.add_post("/auth/register", register_post)
app.router.add_get("/auth/logout", logout_get)
app.router.add_get("/auth/verify", verify_get)
app.router.add_post("/auth/set-password", set_password_post)
app.router.add_post("/auth/forgot-password", forgot_password_post)
app.router.add_get("/auth/reset-password", reset_password_get)
app.router.add_post("/auth/reset-password", reset_password_post)
app.router.add_get("/api/auth/me", me_get)
app.router.add_get("/api/admin/users", admin_users_get)
app.router.add_post("/api/admin/users", admin_user_create)
app.router.add_put("/api/admin/users/{id}", admin_user_update)
app.router.add_post("/api/admin/users/{id}/role", admin_user_role)
app.router.add_post("/api/admin/users/{id}/verify", admin_user_verify)
app.router.add_post("/api/admin/users/{id}/reset-password", admin_user_reset_password)
app.router.add_post("/api/admin/users/{id}/send-info", admin_user_send_info)
app.router.add_delete("/api/admin/users/{id}", admin_user_delete)