from typing import Any, Dict, List, Optional from datetime import datetime, timedelta, timezone import os from fastapi import APIRouter, HTTPException, Depends, UploadFile, File from ..auth import require_admin, get_current_user from ..config import settings as env_settings from ..db import ( delete_setting, get_all_users, get_cached_requests, get_cached_requests_count, get_request_cache_overview, get_request_cache_missing_titles, get_request_cache_stats, get_settings_overrides, get_user_by_id, get_user_by_username, get_user_request_stats, create_user_if_missing, set_user_jellyseerr_id, set_setting, set_user_blocked, set_user_password, set_user_role, run_integrity_check, vacuum_db, clear_requests_cache, clear_history, cleanup_history, update_request_cache_title, repair_request_cache_titles, delete_non_admin_users, ) from ..runtime import get_runtime_settings from ..clients.sonarr import SonarrClient from ..clients.radarr import RadarrClient from ..clients.jellyfin import JellyfinClient from ..clients.jellyseerr import JellyseerrClient from ..services.jellyfin_sync import sync_jellyfin_users import logging from ..logging_config import configure_logging from ..routers import requests as requests_router from ..routers.branding import save_branding_image router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)]) logger = logging.getLogger(__name__) SENSITIVE_KEYS = { "jellyseerr_api_key", "jellyfin_api_key", "sonarr_api_key", "radarr_api_key", "prowlarr_api_key", "qbittorrent_password", } SETTING_KEYS: List[str] = [ "jellyseerr_base_url", "jellyseerr_api_key", "jellyfin_base_url", "jellyfin_api_key", "jellyfin_public_url", "jellyfin_sync_to_arr", "artwork_cache_mode", "sonarr_base_url", "sonarr_api_key", "sonarr_quality_profile_id", "sonarr_root_folder", "sonarr_qbittorrent_category", "radarr_base_url", "radarr_api_key", "radarr_quality_profile_id", "radarr_root_folder", "radarr_qbittorrent_category", "prowlarr_base_url", "prowlarr_api_key", "qbittorrent_base_url", "qbittorrent_username", "qbittorrent_password", "log_level", "log_file", "requests_sync_ttl_minutes", "requests_poll_interval_seconds", "requests_delta_sync_interval_minutes", "requests_full_sync_time", "requests_cleanup_time", "requests_cleanup_days", "requests_data_source", "site_banner_enabled", "site_banner_message", "site_banner_tone", "site_changelog", ] def _normalize_username(value: str) -> str: normalized = value.strip().lower() if "@" in normalized: normalized = normalized.split("@", 1)[0] return normalized def _normalize_root_folders(folders: Any) -> List[Dict[str, Any]]: if not isinstance(folders, list): return [] results = [] for folder in folders: if not isinstance(folder, dict): continue folder_id = folder.get("id") path = folder.get("path") if folder_id is None or path is None: continue results.append({"id": folder_id, "path": path, "label": path}) return results async def _hydrate_cache_titles_from_jellyseerr(limit: int) -> int: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): return 0 missing = get_request_cache_missing_titles(limit) if not missing: return 0 hydrated = 0 for row in missing: tmdb_id = row.get("tmdb_id") media_type = row.get("media_type") request_id = row.get("request_id") if not tmdb_id or not media_type or not request_id: continue try: title, year = await requests_router._hydrate_title_from_tmdb( client, media_type, tmdb_id ) except Exception: logger.warning( "Requests cache title hydrate failed: request_id=%s tmdb_id=%s", request_id, tmdb_id, ) continue if title: update_request_cache_title(request_id, title, year) hydrated += 1 return hydrated def _normalize_quality_profiles(profiles: Any) -> List[Dict[str, Any]]: if not isinstance(profiles, list): return [] results = [] for profile in profiles: if not isinstance(profile, dict): continue profile_id = profile.get("id") name = profile.get("name") if profile_id is None or name is None: continue results.append({"id": profile_id, "name": name, "label": name}) return results @router.get("/settings") async def list_settings() -> Dict[str, Any]: overrides = get_settings_overrides() results = [] for key in SETTING_KEYS: override_present = key in overrides value = overrides.get(key) if override_present else getattr(env_settings, key) is_set = value is not None and str(value).strip() != "" sensitive = key in SENSITIVE_KEYS results.append( { "key": key, "value": None if sensitive else value, "isSet": is_set, "source": "db" if override_present else ("env" if is_set else "unset"), "sensitive": sensitive, } ) return {"settings": results} @router.put("/settings") async def update_settings(payload: Dict[str, Any]) -> Dict[str, Any]: updates = 0 touched_logging = False for key, value in payload.items(): if key not in SETTING_KEYS: raise HTTPException(status_code=400, detail=f"Unknown setting: {key}") if value is None: continue if isinstance(value, str) and value.strip() == "": delete_setting(key) updates += 1 continue set_setting(key, str(value)) updates += 1 if key in {"log_level", "log_file"}: touched_logging = True if touched_logging: runtime = get_runtime_settings() configure_logging(runtime.log_level, runtime.log_file) return {"status": "ok", "updated": updates} @router.get("/sonarr/options") async def sonarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Sonarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/radarr/options") async def radarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Radarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/jellyfin/users") async def jellyfin_users() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyfin not configured") users = await client.get_users() if not isinstance(users, list): return {"users": []} results = [] for user in users: if not isinstance(user, dict): continue results.append( { "id": user.get("Id"), "name": user.get("Name"), "hasPassword": user.get("HasPassword"), "lastLoginDate": user.get("LastLoginDate"), } ) return {"users": results} @router.post("/jellyfin/users/sync") async def jellyfin_users_sync() -> Dict[str, Any]: imported = await sync_jellyfin_users() return {"status": "ok", "imported": imported} def _normalized_handles(value: Any) -> List[str]: if not isinstance(value, str): return [] normalized = value.strip().lower() if not normalized: return [] handles = [normalized] if "@" in normalized: handles.append(normalized.split("@", 1)[0]) return handles def _extract_user_candidates(user: Dict[str, Any]) -> List[str]: candidates: List[str] = [] for key in ("username", "email", "displayName", "name"): candidates.extend(_normalized_handles(user.get(key))) return list(dict.fromkeys(candidates)) async def _fetch_all_jellyseerr_users(client: JellyseerrClient) -> List[Dict[str, Any]]: users: List[Dict[str, Any]] = [] take = 100 skip = 0 while True: payload = await client.get_users(take=take, skip=skip) if not payload: break if isinstance(payload, list): batch = payload elif isinstance(payload, dict): batch = payload.get("results") or payload.get("users") or payload.get("data") or payload.get("items") else: batch = None if not isinstance(batch, list) or not batch: break users.extend([user for user in batch if isinstance(user, dict)]) if len(batch) < take: break skip += take return users @router.post("/jellyseerr/users/sync") async def jellyseerr_users_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client) if not jellyseerr_users: return {"status": "ok", "matched": 0, "skipped": 0, "total": 0} candidate_to_id: Dict[str, int] = {} for user in jellyseerr_users: user_id = user.get("id") or user.get("userId") or user.get("Id") try: user_id = int(user_id) except (TypeError, ValueError): continue for candidate in _extract_user_candidates(user): candidate_to_id.setdefault(candidate, user_id) updated = 0 skipped = 0 users = get_all_users() for user in users: if user.get("jellyseerr_user_id") is not None: skipped += 1 continue username = user.get("username") or "" matched_id = None for handle in _normalized_handles(username): matched_id = candidate_to_id.get(handle) if matched_id is not None: break if matched_id is not None: set_user_jellyseerr_id(username, matched_id) updated += 1 else: skipped += 1 return {"status": "ok", "matched": updated, "skipped": skipped, "total": len(users)} def _pick_jellyseerr_username(user: Dict[str, Any]) -> Optional[str]: for key in ("email", "username", "displayName", "name"): value = user.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None @router.post("/jellyseerr/users/resync") async def jellyseerr_users_resync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client) if not jellyseerr_users: return {"status": "ok", "imported": 0, "cleared": 0} cleared = delete_non_admin_users() imported = 0 for user in jellyseerr_users: user_id = user.get("id") or user.get("userId") or user.get("Id") try: user_id = int(user_id) except (TypeError, ValueError): continue username = _pick_jellyseerr_username(user) if not username: continue created = create_user_if_missing( username, "jellyseerr-user", role="user", auth_provider="jellyseerr", jellyseerr_user_id=user_id, ) if created: imported += 1 else: set_user_jellyseerr_id(username, user_id) return {"status": "ok", "imported": imported, "cleared": cleared} @router.post("/requests/sync") async def requests_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") state = await requests_router.start_requests_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/sync/delta") async def requests_sync_delta() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyseerr not configured") state = await requests_router.start_requests_delta_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered delta requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/artwork/prefetch") async def requests_artwork_prefetch(only_missing: bool = False) -> Dict[str, Any]: runtime = get_runtime_settings() state = await requests_router.start_artwork_prefetch( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key, only_missing=only_missing, ) logger.info("Admin triggered artwork prefetch: status=%s", state.get("status")) return {"status": "ok", "prefetch": state} @router.get("/requests/artwork/status") async def requests_artwork_status() -> Dict[str, Any]: return {"status": "ok", "prefetch": requests_router.get_artwork_prefetch_state()} @router.get("/requests/artwork/summary") async def requests_artwork_summary() -> Dict[str, Any]: runtime = get_runtime_settings() cache_mode = (runtime.artwork_cache_mode or "remote").lower() stats = get_request_cache_stats() if cache_mode != "cache": stats["cache_bytes"] = 0 stats["cache_files"] = 0 stats["missing_artwork"] = 0 summary = { "cache_mode": cache_mode, "cache_bytes": stats.get("cache_bytes", 0), "cache_files": stats.get("cache_files", 0), "missing_artwork": stats.get("missing_artwork", 0), "total_requests": stats.get("total_requests", 0), "updated_at": stats.get("updated_at"), } return {"status": "ok", "summary": summary} @router.get("/requests/sync/status") async def requests_sync_status() -> Dict[str, Any]: return {"status": "ok", "sync": requests_router.get_requests_sync_state()} @router.get("/logs") async def read_logs(lines: int = 200) -> Dict[str, Any]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) if not os.path.exists(log_file): raise HTTPException(status_code=404, detail="Log file not found") lines = max(1, min(lines, 1000)) from collections import deque with open(log_file, "r", encoding="utf-8", errors="replace") as handle: tail = deque(handle, maxlen=lines) return {"lines": list(tail)} @router.get("/requests/cache") async def requests_cache(limit: int = 50) -> Dict[str, Any]: repaired = repair_request_cache_titles() if repaired: logger.info("Requests cache titles repaired via settings view: %s", repaired) hydrated = await _hydrate_cache_titles_from_jellyseerr(limit) if hydrated: logger.info("Requests cache titles hydrated via Jellyseerr: %s", hydrated) rows = get_request_cache_overview(limit) return {"rows": rows} @router.get("/requests/all") async def requests_all( take: int = 50, skip: int = 0, days: Optional[int] = None, user: Dict[str, str] = Depends(get_current_user), ) -> Dict[str, Any]: if user.get("role") != "admin": raise HTTPException(status_code=403, detail="Forbidden") take = max(1, min(int(take or 50), 200)) skip = max(0, int(skip or 0)) since_iso = None if days is not None and int(days) > 0: since_iso = (datetime.now(timezone.utc) - timedelta(days=int(days))).isoformat() rows = get_cached_requests(limit=take, offset=skip, since_iso=since_iso) total = get_cached_requests_count(since_iso=since_iso) results = [] for row in rows: status = row.get("status") results.append( { "id": row.get("request_id"), "title": row.get("title"), "year": row.get("year"), "type": row.get("media_type"), "status": status, "statusLabel": requests_router._status_label(status), "requestedBy": row.get("requested_by"), "createdAt": row.get("created_at"), } ) return {"results": results, "total": total, "take": take, "skip": skip} @router.post("/branding/logo") async def upload_branding_logo(file: UploadFile = File(...)) -> Dict[str, Any]: return await save_branding_image(file) @router.post("/maintenance/repair") async def repair_database() -> Dict[str, Any]: result = run_integrity_check() vacuum_db() logger.info("Database repair executed: integrity_check=%s", result) return {"status": "ok", "integrity": result} @router.post("/maintenance/flush") async def flush_database() -> Dict[str, Any]: cleared = clear_requests_cache() history = clear_history() delete_setting("requests_sync_last_at") logger.warning("Database flush executed: requests_cache=%s history=%s", cleared, history) return {"status": "ok", "requestsCleared": cleared, "historyCleared": history} @router.post("/maintenance/cleanup") async def cleanup_database(days: int = 90) -> Dict[str, Any]: result = cleanup_history(days) logger.info("Database cleanup executed: days=%s result=%s", days, result) return {"status": "ok", "days": days, "cleared": result} @router.post("/maintenance/logs/clear") async def clear_logs() -> Dict[str, Any]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) try: os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, "w", encoding="utf-8"): pass except OSError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc logger.info("Log file cleared") return {"status": "ok"} @router.get("/users") async def list_users() -> Dict[str, Any]: users = [user for user in get_all_users() if user.get("role") == "admin" or user.get("auth_provider") == "jellyseerr"] return {"users": users} @router.get("/users/summary") async def list_users_summary() -> Dict[str, Any]: users = [user for user in get_all_users() if user.get("role") == "admin" or user.get("auth_provider") == "jellyseerr"] results: list[Dict[str, Any]] = [] for user in users: username = user.get("username") or "" username_norm = _normalize_username(username) if username else "" stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) results.append({**user, "stats": stats}) return {"users": results} @router.get("/users/{username}") async def get_user_summary(username: str) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats} @router.get("/users/id/{user_id}") async def get_user_summary_by_id(user_id: int) -> Dict[str, Any]: user = get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats} @router.post("/users/{username}/block") async def block_user(username: str) -> Dict[str, Any]: set_user_blocked(username, True) return {"status": "ok", "username": username, "blocked": True} @router.post("/users/{username}/unblock") async def unblock_user(username: str) -> Dict[str, Any]: set_user_blocked(username, False) return {"status": "ok", "username": username, "blocked": False} @router.post("/users/{username}/role") async def update_user_role(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: role = payload.get("role") if role not in {"admin", "user"}: raise HTTPException(status_code=400, detail="Invalid role") set_user_role(username, role) return {"status": "ok", "username": username, "role": role} @router.post("/users/{username}/password") async def update_user_password(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: new_password = payload.get("password") if isinstance(payload, dict) else None if not isinstance(new_password, str) or len(new_password.strip()) < 8: raise HTTPException(status_code=400, detail="Password must be at least 8 characters.") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if user.get("auth_provider") != "local": raise HTTPException( status_code=400, detail="Password changes are only available for local users." ) set_user_password(username, new_password.strip()) return {"status": "ok", "username": username}