Files
Magent/backend/app/routers/auth.py

149 lines
6.5 KiB
Python

from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from ..db import (
verify_user_password,
create_user_if_missing,
set_last_login,
get_user_by_username,
set_user_password,
get_user_activity,
get_user_activity_summary,
get_user_request_stats,
get_global_request_leader,
get_global_request_total,
)
from ..runtime import get_runtime_settings
from ..clients.jellyfin import JellyfinClient
from ..clients.jellyseerr import JellyseerrClient
from ..security import create_access_token
from ..auth import get_current_user
router = APIRouter(prefix="/auth", tags=["auth"])
def _normalize_username(value: str) -> str:
return value.strip().lower()
@router.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict:
user = verify_user_password(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
if user.get("is_blocked"):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is blocked")
token = create_access_token(user["username"], user["role"])
set_last_login(user["username"])
return {
"access_token": token,
"token_type": "bearer",
"user": {"username": user["username"], "role": user["role"]},
}
@router.post("/jellyfin/login")
async def jellyfin_login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict:
runtime = get_runtime_settings()
client = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key)
if not client.configured():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Jellyfin not configured")
try:
response = await client.authenticate_by_name(form_data.username, form_data.password)
except Exception as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
if not isinstance(response, dict) or not response.get("User"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Jellyfin credentials")
create_user_if_missing(form_data.username, "jellyfin-user", role="user", auth_provider="jellyfin")
user = get_user_by_username(form_data.username)
if user and user.get("is_blocked"):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is blocked")
try:
users = await client.get_users()
if isinstance(users, list):
for user in users:
if not isinstance(user, dict):
continue
name = user.get("Name")
if isinstance(name, str) and name:
create_user_if_missing(name, "jellyfin-user", role="user", auth_provider="jellyfin")
except Exception:
pass
token = create_access_token(form_data.username, "user")
set_last_login(form_data.username)
return {"access_token": token, "token_type": "bearer", "user": {"username": form_data.username, "role": "user"}}
@router.post("/jellyseerr/login")
async def jellyseerr_login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict:
runtime = get_runtime_settings()
client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key)
if not client.configured():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Jellyseerr not configured")
payload = {"email": form_data.username, "password": form_data.password}
try:
response = await client.post("/api/v1/auth/login", payload=payload)
except Exception as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
if not isinstance(response, dict):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Jellyseerr credentials")
create_user_if_missing(form_data.username, "jellyseerr-user", role="user", auth_provider="jellyseerr")
user = get_user_by_username(form_data.username)
if user and user.get("is_blocked"):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is blocked")
token = create_access_token(form_data.username, "user")
set_last_login(form_data.username)
return {"access_token": token, "token_type": "bearer", "user": {"username": form_data.username, "role": "user"}}
@router.get("/me")
async def me(current_user: dict = Depends(get_current_user)) -> dict:
return current_user
@router.get("/profile")
async def profile(current_user: dict = Depends(get_current_user)) -> dict:
username = current_user.get("username") or ""
username_norm = _normalize_username(username) if username else ""
stats = get_user_request_stats(username_norm)
global_total = get_global_request_total()
leader = get_global_request_leader()
share = (stats.get("total", 0) / global_total) if global_total else 0
activity_summary = get_user_activity_summary(username) if username else {}
activity_recent = get_user_activity(username, limit=5) if username else []
return {
"user": current_user,
"stats": {
**stats,
"share": share,
"global_total": global_total,
"most_active_user": leader,
},
"activity": {
**activity_summary,
"recent": activity_recent,
},
}
@router.post("/password")
async def change_password(payload: dict, current_user: dict = Depends(get_current_user)) -> dict:
if current_user.get("auth_provider") != "local":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password changes are only available for local users.",
)
current_password = payload.get("current_password") if isinstance(payload, dict) else None
new_password = payload.get("new_password") if isinstance(payload, dict) else None
if not isinstance(current_password, str) or not isinstance(new_password, str):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid payload")
if len(new_password.strip()) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Password must be at least 8 characters."
)
user = verify_user_password(current_user["username"], current_password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Current password is incorrect")
set_user_password(current_user["username"], new_password.strip())
return {"status": "ok"}