import base64
import logging
import time
import uuid
import bcrypt
from aiohttp import web
import aiohttp_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from cryptography.fernet import Fernet
from meshbot import config
logger = logging.getLogger(__name__)
# ── Password hashing ─────────────────────────────────
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=12)).decode("utf-8")
def check_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
# ── Session setup ────────────────────────────────────
def setup_session(app: web.Application):
secret_key = config.get("auth.secret_key", "change-this-secret-key-32bytes!!")
# Fernet requires a 32-byte url-safe base64-encoded key
fernet_key = base64.urlsafe_b64encode(secret_key.encode("utf-8")[:32])
max_age = config.get("auth.session_max_age", 86400)
storage = EncryptedCookieStorage(
fernet_key,
cookie_name="meshdd_session",
max_age=max_age,
)
aiohttp_session.setup(app, storage)
# ── Auth middleware ───────────────────────────────────
@web.middleware
async def auth_middleware(request: web.Request, handler):
request["user"] = None
try:
session = await aiohttp_session.get_session(request)
if session.get("user_id"):
request["user"] = {
"id": session["user_id"],
"email": session.get("email"),
"name": session.get("name"),
"role": session.get("role"),
}
except Exception:
pass
return await handler(request)
# ── Helper checks ────────────────────────────────────
def require_user(request: web.Request):
if not request["user"]:
raise web.HTTPFound("/login")
def require_admin(request: web.Request):
if not request["user"]:
raise web.HTTPFound("/login")
if request["user"]["role"] != "admin":
raise web.HTTPForbidden(text="Admin access required")
def require_user_api(request: web.Request):
if not request["user"]:
raise web.HTTPUnauthorized(text="Login required")
def require_admin_api(request: web.Request):
if not request["user"]:
raise web.HTTPUnauthorized(text="Login required")
if request["user"]["role"] != "admin":
raise web.HTTPForbidden(text="Admin access required")
# ── Email sending ────────────────────────────────────
async def send_verification_email(db, email: str, token: str):
app_url = config.get("smtp.app_url", "http://localhost:8080")
verify_url = f"{app_url}/auth/verify?token={token}"
subject = "MeshDD-Bot - E-Mail verifizieren"
html_body = f"""
MeshDD-Bot Registrierung
Klicke auf den folgenden Link, um dein Passwort zu setzen und dein Konto zu aktivieren:
{verify_url}
Der Link ist 24 Stunden gueltig.
"""
smtp_host = config.get("smtp.host", "")
if not smtp_host:
logger.info("SMTP not configured - verification link: %s", verify_url)
await db.log_email(email, subject, "console", "SMTP not configured")
return
try:
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = config.get("smtp.from", "MeshDD-Bot ")
msg["To"] = email
msg.attach(MIMEText(html_body, "html"))
await aiosmtplib.send(
msg,
hostname=smtp_host,
port=config.get("smtp.port", 587),
username=config.get("smtp.user", ""),
password=config.get("smtp.password", ""),
start_tls=True,
)
await db.log_email(email, subject, "sent")
logger.info("Verification email sent to %s", email)
except Exception as e:
logger.error("Failed to send email to %s: %s", email, e)
await db.log_email(email, subject, "error", str(e))
async def send_reset_email(db, email: str, token: str):
app_url = config.get("smtp.app_url", "http://localhost:8080")
reset_url = f"{app_url}/auth/reset-password?token={token}"
subject = "MeshDD-Bot - Passwort zuruecksetzen"
html_body = f"""
Passwort zuruecksetzen
Klicke auf den folgenden Link, um dein Passwort zurueckzusetzen:
{reset_url}
Der Link ist 24 Stunden gueltig.
"""
smtp_host = config.get("smtp.host", "")
if not smtp_host:
logger.info("SMTP not configured - reset link: %s", reset_url)
await db.log_email(email, subject, "console", "SMTP not configured")
return
try:
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = config.get("smtp.from", "MeshDD-Bot ")
msg["To"] = email
msg.attach(MIMEText(html_body, "html"))
await aiosmtplib.send(
msg,
hostname=smtp_host,
port=config.get("smtp.port", 587),
username=config.get("smtp.user", ""),
password=config.get("smtp.password", ""),
start_tls=True,
)
await db.log_email(email, subject, "sent")
logger.info("Reset email sent to %s", email)
except Exception as e:
logger.error("Failed to send reset email to %s: %s", email, e)
await db.log_email(email, subject, "error", str(e))
# ── Auth route handlers ──────────────────────────────
def setup_auth_routes(app: web.Application, db):
async def login_post(request: web.Request) -> web.Response:
data = await request.json()
email = data.get("email", "").strip().lower()
password = data.get("password", "")
if not email or not password:
return web.json_response({"error": "E-Mail und Passwort erforderlich"}, status=400)
user = await db.get_user_by_email(email)
if not user or not check_password(password, user["password"]):
return web.json_response({"error": "Ungueltige Anmeldedaten"}, status=401)
if not user["is_verified"]:
return web.json_response({"error": "Konto nicht verifiziert. Bitte pruefe deine E-Mails."}, status=403)
session = await aiohttp_session.new_session(request)
session["user_id"] = user["id"]
session["email"] = user["email"]
session["name"] = user["name"]
session["role"] = user["role"]
return web.json_response({
"id": user["id"],
"email": user["email"],
"name": user["name"],
"role": user["role"],
})
async def register_post(request: web.Request) -> web.Response:
data = await request.json()
name = data.get("name", "").strip()
email = data.get("email", "").strip().lower()
if not name or not email:
return web.json_response({"error": "Name und E-Mail erforderlich"}, status=400)
existing = await db.get_user_by_email(email)
if existing:
return web.json_response({"error": "E-Mail bereits registriert"}, status=409)
# Create user with temporary password hash (will be set during verification)
temp_hash = hash_password(uuid.uuid4().hex)
user = await db.create_user(email=email, password=temp_hash, name=name)
# Generate verification token
token = uuid.uuid4().hex
expires_at = time.time() + 86400 # 24h
await db.create_token(user["id"], token, "verify", expires_at)
# Send email
await send_verification_email(db, email, token)
return web.json_response({"ok": True, "message": "Registrierung erfolgreich. Pruefe deine E-Mails."})
async def logout_get(request: web.Request) -> web.Response:
session = await aiohttp_session.get_session(request)
session.invalidate()
raise web.HTTPFound("/")
async def verify_get(request: web.Request) -> web.Response:
token = request.query.get("token", "")
if not token:
return web.Response(text="Token fehlt", status=400)
token_row = await db.get_valid_token(token, "verify")
if not token_row:
return web.Response(text="Token ungueltig oder abgelaufen", status=400)
# Serve the set-password form
import os
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
return web.FileResponse(os.path.join(static_dir, "login.html"))
async def set_password_post(request: web.Request) -> web.Response:
data = await request.json()
token = data.get("token", "")
password = data.get("password", "")
if not token or not password:
return web.json_response({"error": "Token und Passwort erforderlich"}, status=400)
if len(password) < 8:
return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400)
token_row = await db.get_valid_token(token, "verify")
if not token_row:
return web.json_response({"error": "Token ungueltig oder abgelaufen"}, status=400)
hashed = hash_password(password)
await db.update_user(token_row["user_id"], password=hashed, is_verified=1)
await db.mark_token_used(token_row["id"])
return web.json_response({"ok": True, "message": "Passwort gesetzt. Du kannst dich jetzt anmelden."})
async def forgot_password_post(request: web.Request) -> web.Response:
data = await request.json()
email = data.get("email", "").strip().lower()
if not email:
return web.json_response({"error": "E-Mail erforderlich"}, status=400)
user = await db.get_user_by_email(email)
if not user:
# Don't reveal whether email exists
return web.json_response({"ok": True, "message": "Falls die E-Mail registriert ist, wurde ein Link gesendet."})
token = uuid.uuid4().hex
expires_at = time.time() + 86400 # 24h
await db.create_token(user["id"], token, "reset", expires_at)
await send_reset_email(db, email, token)
return web.json_response({"ok": True, "message": "Falls die E-Mail registriert ist, wurde ein Link gesendet."})
async def reset_password_get(request: web.Request) -> web.Response:
token = request.query.get("token", "")
if not token:
return web.Response(text="Token fehlt", status=400)
token_row = await db.get_valid_token(token, "reset")
if not token_row:
return web.Response(text="Token ungueltig oder abgelaufen", status=400)
import os
static_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "static")
return web.FileResponse(os.path.join(static_dir, "login.html"))
async def reset_password_post(request: web.Request) -> web.Response:
data = await request.json()
token = data.get("token", "")
password = data.get("password", "")
if not token or not password:
return web.json_response({"error": "Token und Passwort erforderlich"}, status=400)
if len(password) < 8:
return web.json_response({"error": "Passwort muss mindestens 8 Zeichen lang sein"}, status=400)
token_row = await db.get_valid_token(token, "reset")
if not token_row:
return web.json_response({"error": "Token ungueltig oder abgelaufen"}, status=400)
hashed = hash_password(password)
await db.update_user(token_row["user_id"], password=hashed)
await db.mark_token_used(token_row["id"])
return web.json_response({"ok": True, "message": "Passwort geaendert. Du kannst dich jetzt anmelden."})
async def me_get(request: web.Request) -> web.Response:
user = request["user"]
if not user:
return web.json_response(None, status=401)
return web.json_response(user)
# ── Admin API ────────────────────────────────────
async def admin_users_get(request: web.Request) -> web.Response:
require_admin_api(request)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_role(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
data = await request.json()
role = data.get("role", "")
if role not in ("user", "admin"):
return web.json_response({"error": "Ungueltige Rolle"}, status=400)
await db.update_user(user_id, role=role)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_verify(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
await db.update_user(user_id, is_verified=1)
users = await db.get_all_users()
return web.json_response(users)
async def admin_user_delete(request: web.Request) -> web.Response:
require_admin_api(request)
user_id = int(request.match_info["id"])
# Prevent self-deletion
if request["user"]["id"] == user_id:
return web.json_response({"error": "Eigenen Account kann man nicht loeschen"}, status=400)
await db.delete_user(user_id)
users = await db.get_all_users()
return web.json_response(users)
# Register routes
app.router.add_post("/auth/login", login_post)
app.router.add_post("/auth/register", register_post)
app.router.add_get("/auth/logout", logout_get)
app.router.add_get("/auth/verify", verify_get)
app.router.add_post("/auth/set-password", set_password_post)
app.router.add_post("/auth/forgot-password", forgot_password_post)
app.router.add_get("/auth/reset-password", reset_password_get)
app.router.add_post("/auth/reset-password", reset_password_post)
app.router.add_get("/api/auth/me", me_get)
app.router.add_get("/api/admin/users", admin_users_get)
app.router.add_post("/api/admin/users/{id}/role", admin_user_role)
app.router.add_post("/api/admin/users/{id}/verify", admin_user_verify)
app.router.add_delete("/api/admin/users/{id}", admin_user_delete)