from typing import Any, Dict, List, Optional from datetime import datetime, timedelta, timezone import asyncio import ipaddress import json import os import secrets import sqlite3 import string from urllib.parse import urlparse, urlunparse from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Request from fastapi.responses import StreamingResponse from ..auth import require_admin, get_current_user, require_admin_event_stream from ..config import settings as env_settings from ..db import ( delete_setting, get_all_users, get_cached_requests, get_cached_requests_count, get_request_cache_overview, get_request_cache_missing_titles, get_request_cache_stats, get_settings_overrides, get_user_by_id, get_user_by_username, get_user_request_stats, create_user_if_missing, set_user_jellyseerr_id, set_setting, set_user_blocked, delete_user_by_username, delete_user_activity_by_username, set_user_auto_search_enabled, set_auto_search_enabled_for_non_admin_users, set_user_profile_id, set_user_expires_at, set_user_password, set_user_role, run_integrity_check, vacuum_db, clear_requests_cache, clear_history, clear_user_objects_nuclear, cleanup_history, update_request_cache_title, repair_request_cache_titles, delete_non_admin_users, list_user_profiles, get_user_profile, create_user_profile, update_user_profile, delete_user_profile, list_signup_invites, get_signup_invite_by_id, create_signup_invite, update_signup_invite, delete_signup_invite, get_signup_invite_by_code, disable_signup_invites_by_creator, ) from ..runtime import get_runtime_settings from ..clients.sonarr import SonarrClient from ..clients.radarr import RadarrClient from ..clients.jellyfin import JellyfinClient from ..clients.jellyseerr import JellyseerrClient from ..services.jellyfin_sync import sync_jellyfin_users from ..services.user_cache import ( build_jellyseerr_candidate_map, get_cached_jellyfin_users, get_cached_jellyseerr_users, match_jellyseerr_user_id, save_jellyfin_users_cache, save_jellyseerr_users_cache, clear_user_import_caches, ) import logging from ..logging_config import configure_logging from ..routers import requests as requests_router from ..routers.branding import save_branding_image router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)]) events_router = APIRouter(prefix="/admin/events", tags=["admin"]) logger = logging.getLogger(__name__) SENSITIVE_KEYS = { "jellyseerr_api_key", "jellyfin_api_key", "sonarr_api_key", "radarr_api_key", "prowlarr_api_key", "qbittorrent_password", } URL_SETTING_KEYS = { "jellyseerr_base_url", "jellyfin_base_url", "jellyfin_public_url", "sonarr_base_url", "radarr_base_url", "prowlarr_base_url", "qbittorrent_base_url", } SETTING_KEYS: List[str] = [ "jellyseerr_base_url", "jellyseerr_api_key", "jellyfin_base_url", "jellyfin_api_key", "jellyfin_public_url", "jellyfin_sync_to_arr", "artwork_cache_mode", "sonarr_base_url", "sonarr_api_key", "sonarr_quality_profile_id", "sonarr_root_folder", "sonarr_qbittorrent_category", "radarr_base_url", "radarr_api_key", "radarr_quality_profile_id", "radarr_root_folder", "radarr_qbittorrent_category", "prowlarr_base_url", "prowlarr_api_key", "qbittorrent_base_url", "qbittorrent_username", "qbittorrent_password", "log_level", "log_file", "requests_sync_ttl_minutes", "requests_poll_interval_seconds", "requests_delta_sync_interval_minutes", "requests_full_sync_time", "requests_cleanup_time", "requests_cleanup_days", "requests_data_source", "site_banner_enabled", "site_banner_message", "site_banner_tone", ] def _http_error_detail(exc: Exception) -> str: try: import httpx # local import to avoid hard dependency in static analysis paths if isinstance(exc, httpx.HTTPStatusError): response = exc.response body = "" try: body = response.text.strip() except Exception: body = "" if body: return f"HTTP {response.status_code}: {body}" return f"HTTP {response.status_code}" except Exception: pass return str(exc) def _user_inviter_details(user: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: if not user: return None invite_code = user.get("invited_by_code") if not invite_code: return None invite = get_signup_invite_by_code(str(invite_code)) if not invite: return { "invite_code": invite_code, "invited_by": None, "invite": None, } return { "invite_code": invite.get("code"), "invited_by": invite.get("created_by"), "invite": { "id": invite.get("id"), "code": invite.get("code"), "label": invite.get("label"), "created_by": invite.get("created_by"), "created_at": invite.get("created_at"), "enabled": invite.get("enabled"), "is_usable": invite.get("is_usable"), }, } def _build_invite_trace_payload() -> Dict[str, Any]: users = get_all_users() invites = list_signup_invites() usernames = {str(user.get("username") or "") for user in users} nodes: list[Dict[str, Any]] = [] edges: list[Dict[str, Any]] = [] for user in users: username = str(user.get("username") or "") inviter = _user_inviter_details(user) nodes.append( { "id": f"user:{username}", "type": "user", "username": username, "label": username, "role": user.get("role"), "auth_provider": user.get("auth_provider"), "created_at": user.get("created_at"), "invited_by_code": user.get("invited_by_code"), "invited_by": inviter.get("invited_by") if inviter else None, } ) invite_codes = set() for invite in invites: code = str(invite.get("code") or "") if not code: continue invite_codes.add(code) nodes.append( { "id": f"invite:{code}", "type": "invite", "code": code, "label": invite.get("label") or code, "created_by": invite.get("created_by"), "enabled": invite.get("enabled"), "use_count": invite.get("use_count"), "remaining_uses": invite.get("remaining_uses"), "created_at": invite.get("created_at"), } ) created_by = invite.get("created_by") if isinstance(created_by, str) and created_by.strip(): edges.append( { "id": f"user:{created_by}->invite:{code}", "from": f"user:{created_by}", "to": f"invite:{code}", "kind": "created", "label": "created", "from_missing": created_by not in usernames, } ) for user in users: username = str(user.get("username") or "") invited_by_code = user.get("invited_by_code") if not isinstance(invited_by_code, str) or not invited_by_code.strip(): continue code = invited_by_code.strip() edges.append( { "id": f"invite:{code}->user:{username}", "from": f"invite:{code}", "to": f"user:{username}", "kind": "invited", "label": code, "from_missing": code not in invite_codes, } ) return { "users": users, "invites": invites, "nodes": nodes, "edges": edges, "generated_at": datetime.now(timezone.utc).isoformat(), } def _admin_live_state_snapshot() -> Dict[str, Any]: return { "type": "admin_live_state", "requestsSync": requests_router.get_requests_sync_state(), "artworkPrefetch": requests_router.get_artwork_prefetch_state(), } def _sse_encode(data: Dict[str, Any]) -> str: payload = json.dumps(data, ensure_ascii=True, separators=(",", ":"), default=str) return f"data: {payload}\n\n" def _read_log_tail_lines(lines: int) -> List[str]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) if not os.path.exists(log_file): raise HTTPException(status_code=404, detail="Log file not found") lines = max(1, min(lines, 1000)) from collections import deque with open(log_file, "r", encoding="utf-8", errors="replace") as handle: tail = deque(handle, maxlen=lines) return list(tail) def _normalize_username(value: str) -> str: normalized = value.strip().lower() if "@" in normalized: normalized = normalized.split("@", 1)[0] return normalized def _is_ip_host(host: str) -> bool: try: ipaddress.ip_address(host) return True except ValueError: return False def _normalize_service_url(value: str) -> str: raw = value.strip() if not raw: raise ValueError("URL cannot be empty.") candidate = raw if "://" not in candidate: authority = candidate.split("/", 1)[0].strip() if authority.startswith("["): closing = authority.find("]") host = authority[1:closing] if closing > 0 else authority.strip("[]") else: host = authority.split(":", 1)[0] host = host.strip().lower() default_scheme = "http" if host in {"localhost"} or _is_ip_host(host) or "." not in host else "https" candidate = f"{default_scheme}://{candidate}" parsed = urlparse(candidate) if parsed.scheme not in {"http", "https"}: raise ValueError("URL must use http:// or https://.") if not parsed.netloc: raise ValueError("URL must include a host.") if parsed.query or parsed.fragment: raise ValueError("URL must not include query params or fragments.") if not parsed.hostname: raise ValueError("URL must include a valid host.") normalized_path = parsed.path.rstrip("/") normalized = parsed._replace(path=normalized_path, params="", query="", fragment="") result = urlunparse(normalized).rstrip("/") if not result: raise ValueError("URL is invalid.") return result def _normalize_root_folders(folders: Any) -> List[Dict[str, Any]]: if not isinstance(folders, list): return [] results = [] for folder in folders: if not isinstance(folder, dict): continue folder_id = folder.get("id") path = folder.get("path") if folder_id is None or path is None: continue results.append({"id": folder_id, "path": path, "label": path}) return results async def _hydrate_cache_titles_from_jellyseerr(limit: int) -> int: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): return 0 missing = get_request_cache_missing_titles(limit) if not missing: return 0 hydrated = 0 for row in missing: tmdb_id = row.get("tmdb_id") media_type = row.get("media_type") request_id = row.get("request_id") if not tmdb_id or not media_type or not request_id: continue try: title, year = await requests_router._hydrate_title_from_tmdb( client, media_type, tmdb_id ) except Exception: logger.warning( "Requests cache title hydrate failed: request_id=%s tmdb_id=%s", request_id, tmdb_id, ) continue if title: update_request_cache_title(request_id, title, year) hydrated += 1 return hydrated def _normalize_quality_profiles(profiles: Any) -> List[Dict[str, Any]]: if not isinstance(profiles, list): return [] results = [] for profile in profiles: if not isinstance(profile, dict): continue profile_id = profile.get("id") name = profile.get("name") if profile_id is None or name is None: continue results.append({"id": profile_id, "name": name, "label": name}) return results def _normalize_optional_text(value: Any) -> Optional[str]: if value is None: return None if not isinstance(value, str): value = str(value) trimmed = value.strip() return trimmed if trimmed else None def _parse_optional_positive_int(value: Any, field_name: str) -> Optional[int]: if value is None or value == "": return None try: parsed = int(value) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail=f"{field_name} must be a number") from exc if parsed <= 0: raise HTTPException(status_code=400, detail=f"{field_name} must be greater than 0") return parsed def _parse_optional_profile_id(value: Any) -> Optional[int]: if value is None or value == "": return None try: parsed = int(value) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="profile_id must be a number") from exc if parsed <= 0: raise HTTPException(status_code=400, detail="profile_id must be greater than 0") profile = get_user_profile(parsed) if not profile: raise HTTPException(status_code=404, detail="Profile not found") return parsed def _parse_optional_expires_at(value: Any) -> Optional[str]: if value is None or value == "": return None if not isinstance(value, str): raise HTTPException(status_code=400, detail="expires_at must be an ISO datetime string") candidate = value.strip() if not candidate: return None try: parsed = datetime.fromisoformat(candidate.replace("Z", "+00:00")) except ValueError as exc: raise HTTPException(status_code=400, detail="expires_at must be a valid ISO datetime") from exc if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.isoformat() def _normalize_invite_code(value: Optional[str]) -> str: raw = (value or "").strip().upper() filtered = "".join(ch for ch in raw if ch.isalnum()) if len(filtered) < 6: raise HTTPException(status_code=400, detail="Invite code must be at least 6 letters/numbers.") return filtered def _generate_invite_code(length: int = 12) -> str: alphabet = string.ascii_uppercase + string.digits return "".join(secrets.choice(alphabet) for _ in range(length)) def _normalize_role_or_none(value: Any) -> Optional[str]: if value is None: return None if not isinstance(value, str): value = str(value) role = value.strip().lower() if not role: return None if role not in {"user", "admin"}: raise HTTPException(status_code=400, detail="role must be 'user' or 'admin'") return role def _calculate_profile_expiry(profile: Dict[str, Any]) -> Optional[str]: expires_days = profile.get("account_expires_days") if isinstance(expires_days, int) and expires_days > 0: return (datetime.now(timezone.utc) + timedelta(days=expires_days)).isoformat() return None def _apply_profile_defaults_to_user(username: str, profile: Dict[str, Any]) -> Dict[str, Any]: set_user_profile_id(username, int(profile["id"])) role = profile.get("role") or "user" if role in {"user", "admin"}: set_user_role(username, role) set_user_auto_search_enabled(username, bool(profile.get("auto_search_enabled", True))) set_user_expires_at(username, _calculate_profile_expiry(profile)) refreshed = get_user_by_username(username) if not refreshed: raise HTTPException(status_code=404, detail="User not found") return refreshed @router.get("/settings") async def list_settings() -> Dict[str, Any]: overrides = get_settings_overrides() results = [] for key in SETTING_KEYS: override_present = key in overrides value = overrides.get(key) if override_present else getattr(env_settings, key) is_set = value is not None and str(value).strip() != "" sensitive = key in SENSITIVE_KEYS results.append( { "key": key, "value": None if sensitive else value, "isSet": is_set, "source": "db" if override_present else ("env" if is_set else "unset"), "sensitive": sensitive, } ) return {"settings": results} @router.put("/settings") async def update_settings(payload: Dict[str, Any]) -> Dict[str, Any]: updates = 0 touched_logging = False for key, value in payload.items(): if key not in SETTING_KEYS: raise HTTPException(status_code=400, detail=f"Unknown setting: {key}") if value is None: continue if isinstance(value, str) and value.strip() == "": delete_setting(key) updates += 1 continue value_to_store = str(value).strip() if isinstance(value, str) else str(value) if key in URL_SETTING_KEYS and value_to_store: try: value_to_store = _normalize_service_url(value_to_store) except ValueError as exc: friendly_key = key.replace("_", " ") raise HTTPException(status_code=400, detail=f"{friendly_key}: {exc}") from exc set_setting(key, value_to_store) updates += 1 if key in {"log_level", "log_file"}: touched_logging = True if touched_logging: runtime = get_runtime_settings() configure_logging(runtime.log_level, runtime.log_file) return {"status": "ok", "updated": updates} @router.get("/sonarr/options") async def sonarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Sonarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/radarr/options") async def radarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Radarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/jellyfin/users") async def jellyfin_users() -> Dict[str, Any]: cached = get_cached_jellyfin_users() if cached is not None: return {"users": cached} runtime = get_runtime_settings() client = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyfin not configured") users = await client.get_users() if not isinstance(users, list): return {"users": []} results = save_jellyfin_users_cache(users) return {"users": results} @router.post("/jellyfin/users/sync") async def jellyfin_users_sync() -> Dict[str, Any]: imported = await sync_jellyfin_users() return {"status": "ok", "imported": imported} async def _fetch_all_jellyseerr_users( client: JellyseerrClient, use_cache: bool = True ) -> List[Dict[str, Any]]: if use_cache: cached = get_cached_jellyseerr_users() if cached is not None: return cached users: List[Dict[str, Any]] = [] take = 100 skip = 0 while True: payload = await client.get_users(take=take, skip=skip) if not payload: break if isinstance(payload, list): batch = payload elif isinstance(payload, dict): batch = payload.get("results") or payload.get("users") or payload.get("data") or payload.get("items") else: batch = None if not isinstance(batch, list) or not batch: break users.extend([user for user in batch if isinstance(user, dict)]) if len(batch) < take: break skip += take if users: return save_jellyseerr_users_cache(users) return users @router.post("/jellyseerr/users/sync") async def jellyseerr_users_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client, use_cache=False) if not jellyseerr_users: return {"status": "ok", "matched": 0, "skipped": 0, "total": 0} candidate_to_id = build_jellyseerr_candidate_map(jellyseerr_users) updated = 0 skipped = 0 users = get_all_users() for user in users: if user.get("jellyseerr_user_id") is not None: skipped += 1 continue username = user.get("username") or "" matched_id = match_jellyseerr_user_id(username, candidate_to_id) if matched_id is not None: set_user_jellyseerr_id(username, matched_id) updated += 1 else: skipped += 1 return {"status": "ok", "matched": updated, "skipped": skipped, "total": len(users)} def _pick_jellyseerr_username(user: Dict[str, Any]) -> Optional[str]: for key in ("email", "username", "displayName", "name"): value = user.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None @router.post("/jellyseerr/users/resync") async def jellyseerr_users_resync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client, use_cache=False) if not jellyseerr_users: return {"status": "ok", "imported": 0, "cleared": 0} cleared = delete_non_admin_users() imported = 0 for user in jellyseerr_users: user_id = user.get("id") or user.get("userId") or user.get("Id") try: user_id = int(user_id) except (TypeError, ValueError): continue username = _pick_jellyseerr_username(user) if not username: continue created = create_user_if_missing( username, "jellyseerr-user", role="user", auth_provider="jellyseerr", jellyseerr_user_id=user_id, ) if created: imported += 1 else: set_user_jellyseerr_id(username, user_id) return {"status": "ok", "imported": imported, "cleared": cleared} @router.post("/requests/sync") async def requests_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") state = await requests_router.start_requests_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/sync/delta") async def requests_sync_delta() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") state = await requests_router.start_requests_delta_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered delta requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/artwork/prefetch") async def requests_artwork_prefetch(only_missing: bool = False) -> Dict[str, Any]: runtime = get_runtime_settings() state = await requests_router.start_artwork_prefetch( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key, only_missing=only_missing, ) logger.info("Admin triggered artwork prefetch: status=%s", state.get("status")) return {"status": "ok", "prefetch": state} @router.get("/requests/artwork/status") async def requests_artwork_status() -> Dict[str, Any]: return {"status": "ok", "prefetch": requests_router.get_artwork_prefetch_state()} @router.get("/requests/artwork/summary") async def requests_artwork_summary() -> Dict[str, Any]: runtime = get_runtime_settings() cache_mode = (runtime.artwork_cache_mode or "remote").lower() stats = get_request_cache_stats() if cache_mode != "cache": stats["cache_bytes"] = 0 stats["cache_files"] = 0 stats["missing_artwork"] = 0 summary = { "cache_mode": cache_mode, "cache_bytes": stats.get("cache_bytes", 0), "cache_files": stats.get("cache_files", 0), "missing_artwork": stats.get("missing_artwork", 0), "total_requests": stats.get("total_requests", 0), "updated_at": stats.get("updated_at"), } return {"status": "ok", "summary": summary} @router.get("/requests/sync/status") async def requests_sync_status() -> Dict[str, Any]: return {"status": "ok", "sync": requests_router.get_requests_sync_state()} @events_router.get("/stream") async def admin_events_stream( request: Request, include_logs: bool = False, log_lines: int = 200, _: Dict[str, Any] = Depends(require_admin_event_stream), ) -> StreamingResponse: async def event_generator(): # Advise client reconnect timing once per stream. yield "retry: 2000\n\n" last_snapshot: Optional[str] = None heartbeat_counter = 0 log_refresh_counter = 5 if include_logs else 0 latest_logs_payload: Optional[Dict[str, Any]] = None while True: if await request.is_disconnected(): break snapshot_payload = _admin_live_state_snapshot() if include_logs: log_refresh_counter += 1 if log_refresh_counter >= 5: log_refresh_counter = 0 try: latest_logs_payload = { "lines": _read_log_tail_lines(log_lines), "count": max(1, min(int(log_lines or 200), 1000)), } except HTTPException as exc: latest_logs_payload = { "error": str(exc.detail) if exc.detail else "Could not read logs", } except Exception as exc: latest_logs_payload = {"error": str(exc)} snapshot_payload["logs"] = latest_logs_payload snapshot = _sse_encode(snapshot_payload) if snapshot != last_snapshot: last_snapshot = snapshot yield snapshot heartbeat_counter = 0 else: heartbeat_counter += 1 # Keep the stream alive through proxies even when state is unchanged. if heartbeat_counter >= 15: yield ": ping\n\n" heartbeat_counter = 0 await asyncio.sleep(1.0) headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers) @router.get("/logs") async def read_logs(lines: int = 200) -> Dict[str, Any]: return {"lines": _read_log_tail_lines(lines)} @router.get("/requests/cache") async def requests_cache(limit: int = 50) -> Dict[str, Any]: repaired = repair_request_cache_titles() if repaired: logger.info("Requests cache titles repaired via settings view: %s", repaired) hydrated = await _hydrate_cache_titles_from_jellyseerr(limit) if hydrated: logger.info("Requests cache titles hydrated via Jellyseerr: %s", hydrated) rows = get_request_cache_overview(limit) return {"rows": rows} @router.get("/requests/all") async def requests_all( take: int = 50, skip: int = 0, days: Optional[int] = None, user: Dict[str, str] = Depends(get_current_user), ) -> Dict[str, Any]: if user.get("role") != "admin": raise HTTPException(status_code=403, detail="Forbidden") take = max(1, min(int(take or 50), 200)) skip = max(0, int(skip or 0)) since_iso = None if days is not None and int(days) > 0: since_iso = (datetime.now(timezone.utc) - timedelta(days=int(days))).isoformat() rows = get_cached_requests(limit=take, offset=skip, since_iso=since_iso) total = get_cached_requests_count(since_iso=since_iso) results = [] for row in rows: status = row.get("status") results.append( { "id": row.get("request_id"), "title": row.get("title"), "year": row.get("year"), "type": row.get("media_type"), "status": status, "statusLabel": requests_router._status_label(status), "requestedBy": row.get("requested_by"), "createdAt": row.get("created_at"), } ) return {"results": results, "total": total, "take": take, "skip": skip} @router.post("/branding/logo") async def upload_branding_logo(file: UploadFile = File(...)) -> Dict[str, Any]: return await save_branding_image(file) @router.post("/maintenance/repair") async def repair_database() -> Dict[str, Any]: result = run_integrity_check() vacuum_db() logger.info("Database repair executed: integrity_check=%s", result) return {"status": "ok", "integrity": result} @router.post("/maintenance/flush") async def flush_database() -> Dict[str, Any]: cleared = clear_requests_cache() history = clear_history() user_objects = clear_user_objects_nuclear() user_caches = clear_user_import_caches() delete_setting("requests_sync_last_at") logger.warning( "Database flush executed: requests_cache=%s history=%s user_objects=%s user_caches=%s", cleared, history, user_objects, user_caches, ) return { "status": "ok", "requestsCleared": cleared, "historyCleared": history, "userObjectsCleared": user_objects, "userCachesCleared": user_caches, } @router.post("/maintenance/cleanup") async def cleanup_database(days: int = 90) -> Dict[str, Any]: result = cleanup_history(days) logger.info("Database cleanup executed: days=%s result=%s", days, result) return {"status": "ok", "days": days, "cleared": result} @router.post("/maintenance/logs/clear") async def clear_logs() -> Dict[str, Any]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) try: os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, "w", encoding="utf-8"): pass except OSError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc logger.info("Log file cleared") return {"status": "ok"} @router.get("/users") async def list_users() -> Dict[str, Any]: users = get_all_users() return {"users": users} @router.get("/users/summary") async def list_users_summary() -> Dict[str, Any]: users = get_all_users() results: list[Dict[str, Any]] = [] for user in users: username = user.get("username") or "" username_norm = _normalize_username(username) if username else "" stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) results.append({**user, "stats": stats}) return {"users": results} @router.get("/users/{username}") async def get_user_summary(username: str) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats, "lineage": _user_inviter_details(user)} @router.get("/users/id/{user_id}") async def get_user_summary_by_id(user_id: int) -> Dict[str, Any]: user = get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats, "lineage": _user_inviter_details(user)} @router.post("/users/{username}/block") async def block_user(username: str) -> Dict[str, Any]: set_user_blocked(username, True) return {"status": "ok", "username": username, "blocked": True} @router.post("/users/{username}/unblock") async def unblock_user(username: str) -> Dict[str, Any]: set_user_blocked(username, False) return {"status": "ok", "username": username, "blocked": False} @router.post("/users/{username}/system-action") async def user_system_action(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") action = str(payload.get("action") or "").strip().lower() if action not in {"ban", "unban", "remove"}: raise HTTPException(status_code=400, detail="action must be ban, unban, or remove") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if user.get("role") == "admin": raise HTTPException(status_code=400, detail="Cross-system actions are not allowed for admin users") runtime = get_runtime_settings() jellyfin = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key) jellyseerr = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) result: Dict[str, Any] = { "status": "ok", "action": action, "username": user.get("username"), "local": {"status": "pending"}, "jellyfin": {"status": "skipped", "detail": "Jellyfin not configured"}, "jellyseerr": {"status": "skipped", "detail": "Jellyseerr not configured or no linked user ID"}, "invites": {"status": "pending", "disabled": 0}, } if action == "ban": set_user_blocked(username, True) result["local"] = {"status": "ok", "blocked": True} elif action == "unban": set_user_blocked(username, False) result["local"] = {"status": "ok", "blocked": False} else: result["local"] = {"status": "pending-delete"} if action in {"ban", "remove"}: result["invites"] = {"status": "ok", "disabled": disable_signup_invites_by_creator(username)} else: result["invites"] = {"status": "ok", "disabled": 0} if jellyfin.configured(): try: jellyfin_user = await jellyfin.find_user_by_name(username) if not jellyfin_user: result["jellyfin"] = {"status": "not_found"} else: jellyfin_user_id = jellyfin._extract_user_id(jellyfin_user) # type: ignore[attr-defined] if not jellyfin_user_id: raise RuntimeError("Could not determine Jellyfin user ID") if action == "ban": await jellyfin.set_user_disabled(jellyfin_user_id, True) result["jellyfin"] = {"status": "ok", "action": "disabled", "user_id": jellyfin_user_id} elif action == "unban": await jellyfin.set_user_disabled(jellyfin_user_id, False) result["jellyfin"] = {"status": "ok", "action": "enabled", "user_id": jellyfin_user_id} else: await jellyfin.delete_user(jellyfin_user_id) result["jellyfin"] = {"status": "ok", "action": "deleted", "user_id": jellyfin_user_id} except Exception as exc: result["jellyfin"] = {"status": "error", "detail": _http_error_detail(exc)} jellyseerr_user_id = user.get("jellyseerr_user_id") if jellyseerr.configured() and jellyseerr_user_id is not None: try: if action == "remove": await jellyseerr.delete_user(int(jellyseerr_user_id)) result["jellyseerr"] = {"status": "ok", "action": "deleted", "user_id": int(jellyseerr_user_id)} elif action == "ban": result["jellyseerr"] = {"status": "ok", "action": "delegated-to-jellyfin-disable", "user_id": int(jellyseerr_user_id)} else: result["jellyseerr"] = {"status": "ok", "action": "delegated-to-jellyfin-enable", "user_id": int(jellyseerr_user_id)} except Exception as exc: result["jellyseerr"] = {"status": "error", "detail": _http_error_detail(exc)} if action == "remove": deleted = delete_user_by_username(username) activity_deleted = delete_user_activity_by_username(username) result["local"] = { "status": "ok" if deleted else "not_found", "deleted": bool(deleted), "activity_deleted": activity_deleted, } if any( isinstance(system, dict) and system.get("status") == "error" for system in (result.get("jellyfin"), result.get("jellyseerr")) ): result["status"] = "partial" return result @router.post("/users/{username}/role") async def update_user_role(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: role = payload.get("role") if role not in {"admin", "user"}: raise HTTPException(status_code=400, detail="Invalid role") set_user_role(username, role) return {"status": "ok", "username": username, "role": role} @router.post("/users/{username}/auto-search") async def update_user_auto_search(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: enabled = payload.get("enabled") if isinstance(payload, dict) else None if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") set_user_auto_search_enabled(username, enabled) return {"status": "ok", "username": username, "auto_search_enabled": enabled} @router.post("/users/{username}/profile") async def update_user_profile_assignment(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") profile_id = payload.get("profile_id") if profile_id in (None, ""): set_user_profile_id(username, None) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} try: parsed_profile_id = int(profile_id) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="profile_id must be a number") from exc profile = get_user_profile(parsed_profile_id) if not profile: raise HTTPException(status_code=404, detail="Profile not found") if not profile.get("is_active", True): raise HTTPException(status_code=400, detail="Profile is disabled") refreshed = _apply_profile_defaults_to_user(username, profile) return {"status": "ok", "user": refreshed, "applied_profile_id": parsed_profile_id} @router.post("/users/{username}/expiry") async def update_user_expiry(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") clear = payload.get("clear") if clear is True: set_user_expires_at(username, None) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} if "days" in payload and payload.get("days") not in (None, ""): days = _parse_optional_positive_int(payload.get("days"), "days") expires_at = None if days is not None: expires_at = (datetime.now(timezone.utc) + timedelta(days=days)).isoformat() set_user_expires_at(username, expires_at) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} expires_at = _parse_optional_expires_at(payload.get("expires_at")) set_user_expires_at(username, expires_at) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} @router.post("/users/auto-search/bulk") async def update_users_auto_search_bulk(payload: Dict[str, Any]) -> Dict[str, Any]: enabled = payload.get("enabled") if isinstance(payload, dict) else None if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") updated = set_auto_search_enabled_for_non_admin_users(enabled) return { "status": "ok", "enabled": enabled, "updated": updated, "scope": "non-admin-users", } @router.post("/users/profile/bulk") async def update_users_profile_bulk(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") scope = str(payload.get("scope") or "non-admin-users").strip().lower() if scope not in {"non-admin-users", "all-users"}: raise HTTPException(status_code=400, detail="Invalid scope") profile_id_value = payload.get("profile_id") if profile_id_value in (None, ""): users = get_all_users() updated = 0 for user in users: if scope == "non-admin-users" and user.get("role") == "admin": continue set_user_profile_id(user["username"], None) updated += 1 return {"status": "ok", "updated": updated, "scope": scope, "profile_id": None} try: profile_id = int(profile_id_value) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="profile_id must be a number") from exc profile = get_user_profile(profile_id) if not profile: raise HTTPException(status_code=404, detail="Profile not found") if not profile.get("is_active", True): raise HTTPException(status_code=400, detail="Profile is disabled") users = get_all_users() updated = 0 for user in users: if scope == "non-admin-users" and user.get("role") == "admin": continue _apply_profile_defaults_to_user(user["username"], profile) updated += 1 return {"status": "ok", "updated": updated, "scope": scope, "profile_id": profile_id} @router.post("/users/expiry/bulk") async def update_users_expiry_bulk(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") scope = str(payload.get("scope") or "non-admin-users").strip().lower() if scope not in {"non-admin-users", "all-users"}: raise HTTPException(status_code=400, detail="Invalid scope") clear = payload.get("clear") expires_at: Optional[str] = None if clear is True: expires_at = None elif "days" in payload and payload.get("days") not in (None, ""): days = _parse_optional_positive_int(payload.get("days"), "days") expires_at = (datetime.now(timezone.utc) + timedelta(days=int(days or 0))).isoformat() if days else None else: expires_at = _parse_optional_expires_at(payload.get("expires_at")) users = get_all_users() updated = 0 for user in users: if scope == "non-admin-users" and user.get("role") == "admin": continue set_user_expires_at(user["username"], expires_at) updated += 1 return {"status": "ok", "updated": updated, "scope": scope, "expires_at": expires_at} @router.post("/users/{username}/password") async def update_user_password(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: new_password = payload.get("password") if isinstance(payload, dict) else None if not isinstance(new_password, str) or len(new_password.strip()) < 8: raise HTTPException(status_code=400, detail="Password must be at least 8 characters.") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if user.get("auth_provider") != "local": raise HTTPException( status_code=400, detail="Password changes are only available for local users." ) set_user_password(username, new_password.strip()) return {"status": "ok", "username": username} @router.get("/profiles") async def get_profiles() -> Dict[str, Any]: profiles = list_user_profiles() users = get_all_users() invites = list_signup_invites() user_counts: Dict[int, int] = {} invite_counts: Dict[int, int] = {} for user in users: profile_id = user.get("profile_id") if isinstance(profile_id, int): user_counts[profile_id] = user_counts.get(profile_id, 0) + 1 for invite in invites: profile_id = invite.get("profile_id") if isinstance(profile_id, int): invite_counts[profile_id] = invite_counts.get(profile_id, 0) + 1 enriched = [] for profile in profiles: pid = int(profile["id"]) enriched.append( { **profile, "assigned_users": user_counts.get(pid, 0), "assigned_invites": invite_counts.get(pid, 0), } ) return {"profiles": enriched} @router.post("/profiles") async def create_profile(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") name = _normalize_optional_text(payload.get("name")) if not name: raise HTTPException(status_code=400, detail="Profile name is required") role = _normalize_role_or_none(payload.get("role")) or "user" auto_search_enabled = payload.get("auto_search_enabled") if auto_search_enabled is None: auto_search_enabled = True if not isinstance(auto_search_enabled, bool): raise HTTPException(status_code=400, detail="auto_search_enabled must be true or false") is_active = payload.get("is_active") if is_active is None: is_active = True if not isinstance(is_active, bool): raise HTTPException(status_code=400, detail="is_active must be true or false") account_expires_days = _parse_optional_positive_int( payload.get("account_expires_days"), "account_expires_days" ) try: profile = create_user_profile( name=name, description=_normalize_optional_text(payload.get("description")), role=role, auto_search_enabled=auto_search_enabled, account_expires_days=account_expires_days, is_active=is_active, ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="A profile with that name already exists") from exc return {"status": "ok", "profile": profile} @router.put("/profiles/{profile_id}") async def edit_profile(profile_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") existing = get_user_profile(profile_id) if not existing: raise HTTPException(status_code=404, detail="Profile not found") name = _normalize_optional_text(payload.get("name")) if not name: raise HTTPException(status_code=400, detail="Profile name is required") role = _normalize_role_or_none(payload.get("role")) or "user" auto_search_enabled = payload.get("auto_search_enabled") if not isinstance(auto_search_enabled, bool): raise HTTPException(status_code=400, detail="auto_search_enabled must be true or false") is_active = payload.get("is_active") if not isinstance(is_active, bool): raise HTTPException(status_code=400, detail="is_active must be true or false") account_expires_days = _parse_optional_positive_int( payload.get("account_expires_days"), "account_expires_days" ) try: profile = update_user_profile( profile_id, name=name, description=_normalize_optional_text(payload.get("description")), role=role, auto_search_enabled=auto_search_enabled, account_expires_days=account_expires_days, is_active=is_active, ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="A profile with that name already exists") from exc if not profile: raise HTTPException(status_code=404, detail="Profile not found") return {"status": "ok", "profile": profile} @router.delete("/profiles/{profile_id}") async def remove_profile(profile_id: int) -> Dict[str, Any]: try: deleted = delete_user_profile(profile_id) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc if not deleted: raise HTTPException(status_code=404, detail="Profile not found") return {"status": "ok", "deleted": True, "profile_id": profile_id} @router.get("/invites") async def get_invites() -> Dict[str, Any]: invites = list_signup_invites() profiles = {profile["id"]: profile for profile in list_user_profiles()} results = [] for invite in invites: profile = profiles.get(invite.get("profile_id")) results.append( { **invite, "profile": ( { "id": profile.get("id"), "name": profile.get("name"), } if profile else None ), } ) return {"invites": results} @router.get("/invites/trace") async def get_invite_trace() -> Dict[str, Any]: return {"status": "ok", "trace": _build_invite_trace_payload()} @router.post("/invites") async def create_invite(payload: Dict[str, Any], current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") raw_code = _normalize_optional_text(payload.get("code")) code = _normalize_invite_code(raw_code) if raw_code else _generate_invite_code() profile_id = _parse_optional_profile_id(payload.get("profile_id")) enabled = payload.get("enabled") if enabled is None: enabled = True if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") role = _normalize_role_or_none(payload.get("role")) max_uses = _parse_optional_positive_int(payload.get("max_uses"), "max_uses") expires_at = _parse_optional_expires_at(payload.get("expires_at")) try: invite = create_signup_invite( code=code, label=_normalize_optional_text(payload.get("label")), description=_normalize_optional_text(payload.get("description")), profile_id=profile_id, role=role, max_uses=max_uses, enabled=enabled, expires_at=expires_at, created_by=current_user.get("username"), ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="An invite with that code already exists") from exc return {"status": "ok", "invite": invite} @router.put("/invites/{invite_id}") async def edit_invite(invite_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") existing = get_signup_invite_by_id(invite_id) if not existing: raise HTTPException(status_code=404, detail="Invite not found") code = _normalize_invite_code(_normalize_optional_text(payload.get("code")) or existing["code"]) profile_id = _parse_optional_profile_id(payload.get("profile_id")) enabled = payload.get("enabled") if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") role = _normalize_role_or_none(payload.get("role")) max_uses = _parse_optional_positive_int(payload.get("max_uses"), "max_uses") expires_at = _parse_optional_expires_at(payload.get("expires_at")) try: invite = update_signup_invite( invite_id, code=code, label=_normalize_optional_text(payload.get("label")), description=_normalize_optional_text(payload.get("description")), profile_id=profile_id, role=role, max_uses=max_uses, enabled=enabled, expires_at=expires_at, ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="An invite with that code already exists") from exc if not invite: raise HTTPException(status_code=404, detail="Invite not found") return {"status": "ok", "invite": invite} @router.delete("/invites/{invite_id}") async def remove_invite(invite_id: int) -> Dict[str, Any]: deleted = delete_signup_invite(invite_id) if not deleted: raise HTTPException(status_code=404, detail="Invite not found") return {"status": "ok", "deleted": True, "invite_id": invite_id}