from typing import Any, Dict, List, Optional from datetime import datetime, timedelta, timezone import ipaddress import os from urllib.parse import urlparse, urlunparse from fastapi import APIRouter, HTTPException, Depends, UploadFile, File from ..auth import require_admin, get_current_user from ..config import settings as env_settings from ..db import ( delete_setting, get_all_users, get_cached_requests, get_cached_requests_count, get_request_cache_overview, get_request_cache_missing_titles, get_request_cache_stats, get_settings_overrides, get_user_by_id, get_user_by_username, get_user_request_stats, create_user_if_missing, set_user_jellyseerr_id, set_setting, set_user_blocked, set_user_password, set_user_role, run_integrity_check, vacuum_db, clear_requests_cache, clear_history, cleanup_history, update_request_cache_title, repair_request_cache_titles, delete_non_admin_users, ) from ..runtime import get_runtime_settings from ..clients.sonarr import SonarrClient from ..clients.radarr import RadarrClient from ..clients.jellyfin import JellyfinClient from ..clients.jellyseerr import JellyseerrClient from ..services.jellyfin_sync import sync_jellyfin_users from ..services.user_cache import ( build_jellyseerr_candidate_map, get_cached_jellyfin_users, get_cached_jellyseerr_users, match_jellyseerr_user_id, save_jellyfin_users_cache, save_jellyseerr_users_cache, ) import logging from ..logging_config import configure_logging from ..routers import requests as requests_router from ..routers.branding import save_branding_image router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)]) logger = logging.getLogger(__name__) SENSITIVE_KEYS = { "jellyseerr_api_key", "jellyfin_api_key", "sonarr_api_key", "radarr_api_key", "prowlarr_api_key", "qbittorrent_password", } URL_SETTING_KEYS = { "jellyseerr_base_url", "jellyfin_base_url", "jellyfin_public_url", "sonarr_base_url", "radarr_base_url", "prowlarr_base_url", "qbittorrent_base_url", } SETTING_KEYS: List[str] = [ "jellyseerr_base_url", "jellyseerr_api_key", "jellyfin_base_url", "jellyfin_api_key", "jellyfin_public_url", "jellyfin_sync_to_arr", "artwork_cache_mode", "sonarr_base_url", "sonarr_api_key", "sonarr_quality_profile_id", "sonarr_root_folder", "sonarr_qbittorrent_category", "radarr_base_url", "radarr_api_key", "radarr_quality_profile_id", "radarr_root_folder", "radarr_qbittorrent_category", "prowlarr_base_url", "prowlarr_api_key", "qbittorrent_base_url", "qbittorrent_username", "qbittorrent_password", "log_level", "log_file", "requests_sync_ttl_minutes", "requests_poll_interval_seconds", "requests_delta_sync_interval_minutes", "requests_full_sync_time", "requests_cleanup_time", "requests_cleanup_days", "requests_data_source", "site_banner_enabled", "site_banner_message", "site_banner_tone", ] def _normalize_username(value: str) -> str: normalized = value.strip().lower() if "@" in normalized: normalized = normalized.split("@", 1)[0] return normalized def _is_ip_host(host: str) -> bool: try: ipaddress.ip_address(host) return True except ValueError: return False def _normalize_service_url(value: str) -> str: raw = value.strip() if not raw: raise ValueError("URL cannot be empty.") candidate = raw if "://" not in candidate: authority = candidate.split("/", 1)[0].strip() if authority.startswith("["): closing = authority.find("]") host = authority[1:closing] if closing > 0 else authority.strip("[]") else: host = authority.split(":", 1)[0] host = host.strip().lower() default_scheme = "http" if host in {"localhost"} or _is_ip_host(host) or "." not in host else "https" candidate = f"{default_scheme}://{candidate}" parsed = urlparse(candidate) if parsed.scheme not in {"http", "https"}: raise ValueError("URL must use http:// or https://.") if not parsed.netloc: raise ValueError("URL must include a host.") if parsed.query or parsed.fragment: raise ValueError("URL must not include query params or fragments.") if not parsed.hostname: raise ValueError("URL must include a valid host.") normalized_path = parsed.path.rstrip("/") normalized = parsed._replace(path=normalized_path, params="", query="", fragment="") result = urlunparse(normalized).rstrip("/") if not result: raise ValueError("URL is invalid.") return result def _normalize_root_folders(folders: Any) -> List[Dict[str, Any]]: if not isinstance(folders, list): return [] results = [] for folder in folders: if not isinstance(folder, dict): continue folder_id = folder.get("id") path = folder.get("path") if folder_id is None or path is None: continue results.append({"id": folder_id, "path": path, "label": path}) return results async def _hydrate_cache_titles_from_jellyseerr(limit: int) -> int: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): return 0 missing = get_request_cache_missing_titles(limit) if not missing: return 0 hydrated = 0 for row in missing: tmdb_id = row.get("tmdb_id") media_type = row.get("media_type") request_id = row.get("request_id") if not tmdb_id or not media_type or not request_id: continue try: title, year = await requests_router._hydrate_title_from_tmdb( client, media_type, tmdb_id ) except Exception: logger.warning( "Requests cache title hydrate failed: request_id=%s tmdb_id=%s", request_id, tmdb_id, ) continue if title: update_request_cache_title(request_id, title, year) hydrated += 1 return hydrated def _normalize_quality_profiles(profiles: Any) -> List[Dict[str, Any]]: if not isinstance(profiles, list): return [] results = [] for profile in profiles: if not isinstance(profile, dict): continue profile_id = profile.get("id") name = profile.get("name") if profile_id is None or name is None: continue results.append({"id": profile_id, "name": name, "label": name}) return results @router.get("/settings") async def list_settings() -> Dict[str, Any]: overrides = get_settings_overrides() results = [] for key in SETTING_KEYS: override_present = key in overrides value = overrides.get(key) if override_present else getattr(env_settings, key) is_set = value is not None and str(value).strip() != "" sensitive = key in SENSITIVE_KEYS results.append( { "key": key, "value": None if sensitive else value, "isSet": is_set, "source": "db" if override_present else ("env" if is_set else "unset"), "sensitive": sensitive, } ) return {"settings": results} @router.put("/settings") async def update_settings(payload: Dict[str, Any]) -> Dict[str, Any]: updates = 0 touched_logging = False for key, value in payload.items(): if key not in SETTING_KEYS: raise HTTPException(status_code=400, detail=f"Unknown setting: {key}") if value is None: continue if isinstance(value, str) and value.strip() == "": delete_setting(key) updates += 1 continue value_to_store = str(value).strip() if isinstance(value, str) else str(value) if key in URL_SETTING_KEYS and value_to_store: try: value_to_store = _normalize_service_url(value_to_store) except ValueError as exc: friendly_key = key.replace("_", " ") raise HTTPException(status_code=400, detail=f"{friendly_key}: {exc}") from exc set_setting(key, value_to_store) updates += 1 if key in {"log_level", "log_file"}: touched_logging = True if touched_logging: runtime = get_runtime_settings() configure_logging(runtime.log_level, runtime.log_file) return {"status": "ok", "updated": updates} @router.get("/sonarr/options") async def sonarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Sonarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/radarr/options") async def radarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Radarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/jellyfin/users") async def jellyfin_users() -> Dict[str, Any]: cached = get_cached_jellyfin_users() if cached is not None: return {"users": cached} runtime = get_runtime_settings() client = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyfin not configured") users = await client.get_users() if not isinstance(users, list): return {"users": []} results = save_jellyfin_users_cache(users) return {"users": results} @router.post("/jellyfin/users/sync") async def jellyfin_users_sync() -> Dict[str, Any]: imported = await sync_jellyfin_users() return {"status": "ok", "imported": imported} async def _fetch_all_jellyseerr_users( client: JellyseerrClient, use_cache: bool = True ) -> List[Dict[str, Any]]: if use_cache: cached = get_cached_jellyseerr_users() if cached is not None: return cached users: List[Dict[str, Any]] = [] take = 100 skip = 0 while True: payload = await client.get_users(take=take, skip=skip) if not payload: break if isinstance(payload, list): batch = payload elif isinstance(payload, dict): batch = payload.get("results") or payload.get("users") or payload.get("data") or payload.get("items") else: batch = None if not isinstance(batch, list) or not batch: break users.extend([user for user in batch if isinstance(user, dict)]) if len(batch) < take: break skip += take if users: return save_jellyseerr_users_cache(users) return users @router.post("/jellyseerr/users/sync") async def jellyseerr_users_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client, use_cache=False) if not jellyseerr_users: return {"status": "ok", "matched": 0, "skipped": 0, "total": 0} candidate_to_id = build_jellyseerr_candidate_map(jellyseerr_users) updated = 0 skipped = 0 users = get_all_users() for user in users: if user.get("jellyseerr_user_id") is not None: skipped += 1 continue username = user.get("username") or "" matched_id = match_jellyseerr_user_id(username, candidate_to_id) if matched_id is not None: set_user_jellyseerr_id(username, matched_id) updated += 1 else: skipped += 1 return {"status": "ok", "matched": updated, "skipped": skipped, "total": len(users)} def _pick_jellyseerr_username(user: Dict[str, Any]) -> Optional[str]: for key in ("email", "username", "displayName", "name"): value = user.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None @router.post("/jellyseerr/users/resync") async def jellyseerr_users_resync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client, use_cache=False) if not jellyseerr_users: return {"status": "ok", "imported": 0, "cleared": 0} cleared = delete_non_admin_users() imported = 0 for user in jellyseerr_users: user_id = user.get("id") or user.get("userId") or user.get("Id") try: user_id = int(user_id) except (TypeError, ValueError): continue username = _pick_jellyseerr_username(user) if not username: continue created = create_user_if_missing( username, "jellyseerr-user", role="user", auth_provider="jellyseerr", jellyseerr_user_id=user_id, ) if created: imported += 1 else: set_user_jellyseerr_id(username, user_id) return {"status": "ok", "imported": imported, "cleared": cleared} @router.post("/requests/sync") async def requests_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") state = await requests_router.start_requests_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/sync/delta") async def requests_sync_delta() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") state = await requests_router.start_requests_delta_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered delta requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/artwork/prefetch") async def requests_artwork_prefetch(only_missing: bool = False) -> Dict[str, Any]: runtime = get_runtime_settings() state = await requests_router.start_artwork_prefetch( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key, only_missing=only_missing, ) logger.info("Admin triggered artwork prefetch: status=%s", state.get("status")) return {"status": "ok", "prefetch": state} @router.get("/requests/artwork/status") async def requests_artwork_status() -> Dict[str, Any]: return {"status": "ok", "prefetch": requests_router.get_artwork_prefetch_state()} @router.get("/requests/artwork/summary") async def requests_artwork_summary() -> Dict[str, Any]: runtime = get_runtime_settings() cache_mode = (runtime.artwork_cache_mode or "remote").lower() stats = get_request_cache_stats() if cache_mode != "cache": stats["cache_bytes"] = 0 stats["cache_files"] = 0 stats["missing_artwork"] = 0 summary = { "cache_mode": cache_mode, "cache_bytes": stats.get("cache_bytes", 0), "cache_files": stats.get("cache_files", 0), "missing_artwork": stats.get("missing_artwork", 0), "total_requests": stats.get("total_requests", 0), "updated_at": stats.get("updated_at"), } return {"status": "ok", "summary": summary} @router.get("/requests/sync/status") async def requests_sync_status() -> Dict[str, Any]: return {"status": "ok", "sync": requests_router.get_requests_sync_state()} @router.get("/logs") async def read_logs(lines: int = 200) -> Dict[str, Any]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) if not os.path.exists(log_file): raise HTTPException(status_code=404, detail="Log file not found") lines = max(1, min(lines, 1000)) from collections import deque with open(log_file, "r", encoding="utf-8", errors="replace") as handle: tail = deque(handle, maxlen=lines) return {"lines": list(tail)} @router.get("/requests/cache") async def requests_cache(limit: int = 50) -> Dict[str, Any]: repaired = repair_request_cache_titles() if repaired: logger.info("Requests cache titles repaired via settings view: %s", repaired) hydrated = await _hydrate_cache_titles_from_jellyseerr(limit) if hydrated: logger.info("Requests cache titles hydrated via Jellyseerr: %s", hydrated) rows = get_request_cache_overview(limit) return {"rows": rows} @router.get("/requests/all") async def requests_all( take: int = 50, skip: int = 0, days: Optional[int] = None, user: Dict[str, str] = Depends(get_current_user), ) -> Dict[str, Any]: if user.get("role") != "admin": raise HTTPException(status_code=403, detail="Forbidden") take = max(1, min(int(take or 50), 200)) skip = max(0, int(skip or 0)) since_iso = None if days is not None and int(days) > 0: since_iso = (datetime.now(timezone.utc) - timedelta(days=int(days))).isoformat() rows = get_cached_requests(limit=take, offset=skip, since_iso=since_iso) total = get_cached_requests_count(since_iso=since_iso) results = [] for row in rows: status = row.get("status") results.append( { "id": row.get("request_id"), "title": row.get("title"), "year": row.get("year"), "type": row.get("media_type"), "status": status, "statusLabel": requests_router._status_label(status), "requestedBy": row.get("requested_by"), "createdAt": row.get("created_at"), } ) return {"results": results, "total": total, "take": take, "skip": skip} @router.post("/branding/logo") async def upload_branding_logo(file: UploadFile = File(...)) -> Dict[str, Any]: return await save_branding_image(file) @router.post("/maintenance/repair") async def repair_database() -> Dict[str, Any]: result = run_integrity_check() vacuum_db() logger.info("Database repair executed: integrity_check=%s", result) return {"status": "ok", "integrity": result} @router.post("/maintenance/flush") async def flush_database() -> Dict[str, Any]: cleared = clear_requests_cache() history = clear_history() delete_setting("requests_sync_last_at") logger.warning("Database flush executed: requests_cache=%s history=%s", cleared, history) return {"status": "ok", "requestsCleared": cleared, "historyCleared": history} @router.post("/maintenance/cleanup") async def cleanup_database(days: int = 90) -> Dict[str, Any]: result = cleanup_history(days) logger.info("Database cleanup executed: days=%s result=%s", days, result) return {"status": "ok", "days": days, "cleared": result} @router.post("/maintenance/logs/clear") async def clear_logs() -> Dict[str, Any]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) try: os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, "w", encoding="utf-8"): pass except OSError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc logger.info("Log file cleared") return {"status": "ok"} @router.get("/users") async def list_users() -> Dict[str, Any]: users = [user for user in get_all_users() if user.get("role") == "admin" or user.get("auth_provider") == "jellyseerr"] return {"users": users} @router.get("/users/summary") async def list_users_summary() -> Dict[str, Any]: users = [user for user in get_all_users() if user.get("role") == "admin" or user.get("auth_provider") == "jellyseerr"] results: list[Dict[str, Any]] = [] for user in users: username = user.get("username") or "" username_norm = _normalize_username(username) if username else "" stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) results.append({**user, "stats": stats}) return {"users": results} @router.get("/users/{username}") async def get_user_summary(username: str) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats} @router.get("/users/id/{user_id}") async def get_user_summary_by_id(user_id: int) -> Dict[str, Any]: user = get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats} @router.post("/users/{username}/block") async def block_user(username: str) -> Dict[str, Any]: set_user_blocked(username, True) return {"status": "ok", "username": username, "blocked": True} @router.post("/users/{username}/unblock") async def unblock_user(username: str) -> Dict[str, Any]: set_user_blocked(username, False) return {"status": "ok", "username": username, "blocked": False} @router.post("/users/{username}/role") async def update_user_role(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: role = payload.get("role") if role not in {"admin", "user"}: raise HTTPException(status_code=400, detail="Invalid role") set_user_role(username, role) return {"status": "ok", "username": username, "role": role} @router.post("/users/{username}/password") async def update_user_password(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: new_password = payload.get("password") if isinstance(payload, dict) else None if not isinstance(new_password, str) or len(new_password.strip()) < 8: raise HTTPException(status_code=400, detail="Password must be at least 8 characters.") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if user.get("auth_provider") != "local": raise HTTPException( status_code=400, detail="Password changes are only available for local users." ) set_user_password(username, new_password.strip()) return {"status": "ok", "username": username}