from typing import Any, Dict, List, Optional from datetime import datetime, timedelta, timezone import asyncio import ipaddress import json import os import secrets import sqlite3 import string from urllib.parse import urlparse, urlunparse from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Request from fastapi.responses import StreamingResponse from ..auth import ( require_admin, get_current_user, require_admin_event_stream, normalize_user_auth_provider, resolve_user_auth_provider, ) from ..config import settings as env_settings from ..db import ( delete_setting, get_all_users, get_cached_requests, get_cached_requests_count, get_setting, get_request_cache_overview, get_request_cache_missing_titles, get_request_cache_stats, get_settings_overrides, get_user_by_id, get_user_by_username, get_user_request_stats, create_user_if_missing, set_user_jellyseerr_id, set_setting, set_user_blocked, delete_user_by_username, delete_user_activity_by_username, set_user_auto_search_enabled, set_auto_search_enabled_for_non_admin_users, set_user_invite_management_enabled, set_invite_management_enabled_for_non_admin_users, set_user_profile_id, set_user_expires_at, set_user_password, sync_jellyfin_password_state, set_user_role, run_integrity_check, vacuum_db, clear_requests_cache, clear_history, clear_user_objects_nuclear, cleanup_history, update_request_cache_title, repair_request_cache_titles, delete_non_admin_users, list_user_profiles, get_user_profile, create_user_profile, update_user_profile, delete_user_profile, list_signup_invites, get_signup_invite_by_id, create_signup_invite, update_signup_invite, delete_signup_invite, get_signup_invite_by_code, disable_signup_invites_by_creator, ) from ..runtime import get_runtime_settings from ..clients.sonarr import SonarrClient from ..clients.radarr import RadarrClient from ..clients.jellyfin import JellyfinClient from ..clients.jellyseerr import JellyseerrClient from ..services.jellyfin_sync import sync_jellyfin_users from ..services.user_cache import ( build_jellyseerr_candidate_map, get_cached_jellyfin_users, get_cached_jellyseerr_users, match_jellyseerr_user_id, save_jellyfin_users_cache, save_jellyseerr_users_cache, clear_user_import_caches, ) from ..services.invite_email import ( TEMPLATE_KEYS as INVITE_EMAIL_TEMPLATE_KEYS, get_invite_email_templates, reset_invite_email_template, save_invite_email_template, send_test_email, smtp_email_delivery_warning, send_templated_email, smtp_email_config_ready, ) from ..services.diagnostics import get_diagnostics_catalog, run_diagnostics import logging from ..logging_config import configure_logging from ..routers import requests as requests_router from ..routers.branding import save_branding_image router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)]) events_router = APIRouter(prefix="/admin/events", tags=["admin"]) logger = logging.getLogger(__name__) SELF_SERVICE_INVITE_MASTER_ID_KEY = "self_service_invite_master_id" SENSITIVE_KEYS = { "magent_ssl_certificate_pem", "magent_ssl_private_key_pem", "magent_notify_email_smtp_password", "magent_notify_discord_webhook_url", "magent_notify_telegram_bot_token", "magent_notify_push_token", "magent_notify_push_user_key", "magent_notify_webhook_url", "jellyseerr_api_key", "jellyfin_api_key", "sonarr_api_key", "radarr_api_key", "prowlarr_api_key", "qbittorrent_password", } URL_SETTING_KEYS = { "magent_application_url", "magent_api_url", "magent_proxy_base_url", "magent_notify_discord_webhook_url", "magent_notify_push_base_url", "jellyseerr_base_url", "jellyfin_base_url", "jellyfin_public_url", "sonarr_base_url", "radarr_base_url", "prowlarr_base_url", "qbittorrent_base_url", } SETTING_KEYS: List[str] = [ "magent_application_url", "magent_application_port", "magent_api_url", "magent_api_port", "magent_bind_host", "magent_proxy_enabled", "magent_proxy_base_url", "magent_proxy_trust_forwarded_headers", "magent_proxy_forwarded_prefix", "magent_ssl_bind_enabled", "magent_ssl_certificate_path", "magent_ssl_private_key_path", "magent_ssl_certificate_pem", "magent_ssl_private_key_pem", "magent_notify_enabled", "magent_notify_email_enabled", "magent_notify_email_smtp_host", "magent_notify_email_smtp_port", "magent_notify_email_smtp_username", "magent_notify_email_smtp_password", "magent_notify_email_from_address", "magent_notify_email_from_name", "magent_notify_email_use_tls", "magent_notify_email_use_ssl", "magent_notify_discord_enabled", "magent_notify_discord_webhook_url", "magent_notify_telegram_enabled", "magent_notify_telegram_bot_token", "magent_notify_telegram_chat_id", "magent_notify_push_enabled", "magent_notify_push_provider", "magent_notify_push_base_url", "magent_notify_push_topic", "magent_notify_push_token", "magent_notify_push_user_key", "magent_notify_push_device", "magent_notify_webhook_enabled", "magent_notify_webhook_url", "jellyseerr_base_url", "jellyseerr_api_key", "jellyfin_base_url", "jellyfin_api_key", "jellyfin_public_url", "jellyfin_sync_to_arr", "artwork_cache_mode", "sonarr_base_url", "sonarr_api_key", "sonarr_quality_profile_id", "sonarr_root_folder", "sonarr_qbittorrent_category", "radarr_base_url", "radarr_api_key", "radarr_quality_profile_id", "radarr_root_folder", "radarr_qbittorrent_category", "prowlarr_base_url", "prowlarr_api_key", "qbittorrent_base_url", "qbittorrent_username", "qbittorrent_password", "log_level", "log_file", "log_file_max_bytes", "log_file_backup_count", "log_http_client_level", "log_background_sync_level", "requests_sync_ttl_minutes", "requests_poll_interval_seconds", "requests_delta_sync_interval_minutes", "requests_full_sync_time", "requests_cleanup_time", "requests_cleanup_days", "requests_data_source", "site_banner_enabled", "site_banner_message", "site_banner_tone", ] def _http_error_detail(exc: Exception) -> str: try: import httpx # local import to avoid hard dependency in static analysis paths if isinstance(exc, httpx.HTTPStatusError): response = exc.response body = "" try: body = response.text.strip() except Exception: body = "" if body: return f"HTTP {response.status_code}: {body}" return f"HTTP {response.status_code}" except Exception: pass return str(exc) def _user_inviter_details(user: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: if not user: return None invite_code = user.get("invited_by_code") if not invite_code: return None invite = get_signup_invite_by_code(str(invite_code)) if not invite: return { "invite_code": invite_code, "invited_by": None, "invite": None, } return { "invite_code": invite.get("code"), "invited_by": invite.get("created_by"), "invite": { "id": invite.get("id"), "code": invite.get("code"), "label": invite.get("label"), "created_by": invite.get("created_by"), "created_at": invite.get("created_at"), "enabled": invite.get("enabled"), "is_usable": invite.get("is_usable"), "recipient_email": invite.get("recipient_email"), }, } def _resolve_user_invite(user: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: if not user: return None invite_code = user.get("invited_by_code") if not isinstance(invite_code, str) or not invite_code.strip(): return None return get_signup_invite_by_code(invite_code.strip()) def _build_invite_trace_payload() -> Dict[str, Any]: users = get_all_users() invites = list_signup_invites() usernames = {str(user.get("username") or "") for user in users} nodes: list[Dict[str, Any]] = [] edges: list[Dict[str, Any]] = [] for user in users: username = str(user.get("username") or "") inviter = _user_inviter_details(user) nodes.append( { "id": f"user:{username}", "type": "user", "username": username, "label": username, "role": user.get("role"), "auth_provider": user.get("auth_provider"), "created_at": user.get("created_at"), "invited_by_code": user.get("invited_by_code"), "invited_by": inviter.get("invited_by") if inviter else None, } ) invite_codes = set() for invite in invites: code = str(invite.get("code") or "") if not code: continue invite_codes.add(code) nodes.append( { "id": f"invite:{code}", "type": "invite", "code": code, "label": invite.get("label") or code, "created_by": invite.get("created_by"), "enabled": invite.get("enabled"), "use_count": invite.get("use_count"), "remaining_uses": invite.get("remaining_uses"), "created_at": invite.get("created_at"), } ) created_by = invite.get("created_by") if isinstance(created_by, str) and created_by.strip(): edges.append( { "id": f"user:{created_by}->invite:{code}", "from": f"user:{created_by}", "to": f"invite:{code}", "kind": "created", "label": "created", "from_missing": created_by not in usernames, } ) for user in users: username = str(user.get("username") or "") invited_by_code = user.get("invited_by_code") if not isinstance(invited_by_code, str) or not invited_by_code.strip(): continue code = invited_by_code.strip() edges.append( { "id": f"invite:{code}->user:{username}", "from": f"invite:{code}", "to": f"user:{username}", "kind": "invited", "label": code, "from_missing": code not in invite_codes, } ) return { "users": users, "invites": invites, "nodes": nodes, "edges": edges, "generated_at": datetime.now(timezone.utc).isoformat(), } def _admin_live_state_snapshot() -> Dict[str, Any]: return { "type": "admin_live_state", "requestsSync": requests_router.get_requests_sync_state(), "artworkPrefetch": requests_router.get_artwork_prefetch_state(), } def _sse_encode(data: Dict[str, Any]) -> str: payload = json.dumps(data, ensure_ascii=True, separators=(",", ":"), default=str) return f"data: {payload}\n\n" def _read_log_tail_lines(lines: int) -> List[str]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) if not os.path.exists(log_file): raise HTTPException(status_code=404, detail="Log file not found") lines = max(1, min(lines, 1000)) from collections import deque with open(log_file, "r", encoding="utf-8", errors="replace") as handle: tail = deque(handle, maxlen=lines) return list(tail) def _normalize_username(value: str) -> str: normalized = value.strip().lower() if "@" in normalized: normalized = normalized.split("@", 1)[0] return normalized def _is_ip_host(host: str) -> bool: try: ipaddress.ip_address(host) return True except ValueError: return False def _normalize_service_url(value: str) -> str: raw = value.strip() if not raw: raise ValueError("URL cannot be empty.") candidate = raw if "://" not in candidate: authority = candidate.split("/", 1)[0].strip() if authority.startswith("["): closing = authority.find("]") host = authority[1:closing] if closing > 0 else authority.strip("[]") else: host = authority.split(":", 1)[0] host = host.strip().lower() default_scheme = "http" if host in {"localhost"} or _is_ip_host(host) or "." not in host else "https" candidate = f"{default_scheme}://{candidate}" parsed = urlparse(candidate) if parsed.scheme not in {"http", "https"}: raise ValueError("URL must use http:// or https://.") if not parsed.netloc: raise ValueError("URL must include a host.") if parsed.query or parsed.fragment: raise ValueError("URL must not include query params or fragments.") if not parsed.hostname: raise ValueError("URL must include a valid host.") normalized_path = parsed.path.rstrip("/") normalized = parsed._replace(path=normalized_path, params="", query="", fragment="") result = urlunparse(normalized).rstrip("/") if not result: raise ValueError("URL is invalid.") return result def _normalize_root_folders(folders: Any) -> List[Dict[str, Any]]: if not isinstance(folders, list): return [] results = [] for folder in folders: if not isinstance(folder, dict): continue folder_id = folder.get("id") path = folder.get("path") if folder_id is None or path is None: continue results.append({"id": folder_id, "path": path, "label": path}) return results async def _hydrate_cache_titles_from_jellyseerr(limit: int) -> int: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): return 0 missing = get_request_cache_missing_titles(limit) if not missing: return 0 hydrated = 0 for row in missing: tmdb_id = row.get("tmdb_id") media_type = row.get("media_type") request_id = row.get("request_id") if not tmdb_id or not media_type or not request_id: continue try: title, year = await requests_router._hydrate_title_from_tmdb( client, media_type, tmdb_id ) except Exception: logger.warning( "Requests cache title hydrate failed: request_id=%s tmdb_id=%s", request_id, tmdb_id, ) continue if title: update_request_cache_title(request_id, title, year) hydrated += 1 return hydrated def _normalize_quality_profiles(profiles: Any) -> List[Dict[str, Any]]: if not isinstance(profiles, list): return [] results = [] for profile in profiles: if not isinstance(profile, dict): continue profile_id = profile.get("id") name = profile.get("name") if profile_id is None or name is None: continue results.append({"id": profile_id, "name": name, "label": name}) return results def _normalize_optional_text(value: Any) -> Optional[str]: if value is None: return None if not isinstance(value, str): value = str(value) trimmed = value.strip() return trimmed if trimmed else None def _parse_optional_positive_int(value: Any, field_name: str) -> Optional[int]: if value is None or value == "": return None try: parsed = int(value) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail=f"{field_name} must be a number") from exc if parsed <= 0: raise HTTPException(status_code=400, detail=f"{field_name} must be greater than 0") return parsed def _parse_optional_profile_id(value: Any) -> Optional[int]: if value is None or value == "": return None try: parsed = int(value) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="profile_id must be a number") from exc if parsed <= 0: raise HTTPException(status_code=400, detail="profile_id must be greater than 0") profile = get_user_profile(parsed) if not profile: raise HTTPException(status_code=404, detail="Profile not found") return parsed def _parse_optional_expires_at(value: Any) -> Optional[str]: if value is None or value == "": return None if not isinstance(value, str): raise HTTPException(status_code=400, detail="expires_at must be an ISO datetime string") candidate = value.strip() if not candidate: return None try: parsed = datetime.fromisoformat(candidate.replace("Z", "+00:00")) except ValueError as exc: raise HTTPException(status_code=400, detail="expires_at must be a valid ISO datetime") from exc if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.isoformat() def _normalize_invite_code(value: Optional[str]) -> str: raw = (value or "").strip().upper() filtered = "".join(ch for ch in raw if ch.isalnum()) if len(filtered) < 6: raise HTTPException(status_code=400, detail="Invite code must be at least 6 letters/numbers.") return filtered def _generate_invite_code(length: int = 12) -> str: alphabet = string.ascii_uppercase + string.digits return "".join(secrets.choice(alphabet) for _ in range(length)) def _normalize_role_or_none(value: Any) -> Optional[str]: if value is None: return None if not isinstance(value, str): value = str(value) role = value.strip().lower() if not role: return None if role not in {"user", "admin"}: raise HTTPException(status_code=400, detail="role must be 'user' or 'admin'") return role def _calculate_profile_expiry(profile: Dict[str, Any]) -> Optional[str]: expires_days = profile.get("account_expires_days") if isinstance(expires_days, int) and expires_days > 0: return (datetime.now(timezone.utc) + timedelta(days=expires_days)).isoformat() return None def _apply_profile_defaults_to_user(username: str, profile: Dict[str, Any]) -> Dict[str, Any]: set_user_profile_id(username, int(profile["id"])) role = profile.get("role") or "user" if role in {"user", "admin"}: set_user_role(username, role) set_user_auto_search_enabled(username, bool(profile.get("auto_search_enabled", True))) set_user_expires_at(username, _calculate_profile_expiry(profile)) refreshed = get_user_by_username(username) if not refreshed: raise HTTPException(status_code=404, detail="User not found") return refreshed @router.get("/settings") async def list_settings() -> Dict[str, Any]: overrides = get_settings_overrides() results = [] for key in SETTING_KEYS: override_present = key in overrides value = overrides.get(key) if override_present else getattr(env_settings, key) is_set = value is not None and str(value).strip() != "" sensitive = key in SENSITIVE_KEYS results.append( { "key": key, "value": None if sensitive else value, "isSet": is_set, "source": "db" if override_present else ("env" if is_set else "unset"), "sensitive": sensitive, } ) return {"settings": results} @router.put("/settings") async def update_settings(payload: Dict[str, Any]) -> Dict[str, Any]: updates = 0 touched_logging = False changed_keys: List[str] = [] for key, value in payload.items(): if key not in SETTING_KEYS: raise HTTPException(status_code=400, detail=f"Unknown setting: {key}") if value is None: continue if isinstance(value, str) and value.strip() == "": delete_setting(key) updates += 1 changed_keys.append(key) continue value_to_store = str(value).strip() if isinstance(value, str) else str(value) if key in URL_SETTING_KEYS and value_to_store: try: value_to_store = _normalize_service_url(value_to_store) except ValueError as exc: friendly_key = key.replace("_", " ") raise HTTPException(status_code=400, detail=f"{friendly_key}: {exc}") from exc set_setting(key, value_to_store) updates += 1 changed_keys.append(key) if key in {"log_level", "log_file", "log_file_max_bytes", "log_file_backup_count", "log_http_client_level", "log_background_sync_level"}: touched_logging = True if touched_logging: runtime = get_runtime_settings() configure_logging( runtime.log_level, runtime.log_file, log_file_max_bytes=runtime.log_file_max_bytes, log_file_backup_count=runtime.log_file_backup_count, log_http_client_level=runtime.log_http_client_level, log_background_sync_level=runtime.log_background_sync_level, ) logger.info("Admin updated settings: count=%s keys=%s", updates, changed_keys) return {"status": "ok", "updated": updates} @router.post("/settings/test/email") async def test_email_settings(request: Request) -> Dict[str, Any]: recipient_email = None content_type = (request.headers.get("content-type") or "").split(";", 1)[0].strip().lower() try: if content_type == "application/json": payload = await request.json() if isinstance(payload, dict) and isinstance(payload.get("recipient_email"), str): recipient_email = payload["recipient_email"] elif content_type in { "application/x-www-form-urlencoded", "multipart/form-data", }: form = await request.form() candidate = form.get("recipient_email") if isinstance(candidate, str): recipient_email = candidate except Exception: recipient_email = None try: result = await send_test_email(recipient_email=recipient_email) except RuntimeError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc logger.info("Admin triggered SMTP test: recipient=%s", result.get("recipient_email")) return {"status": "ok", **result} @router.get("/diagnostics") async def diagnostics_catalog() -> Dict[str, Any]: return {"status": "ok", **get_diagnostics_catalog()} @router.post("/diagnostics/run") async def diagnostics_run(payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: keys: Optional[List[str]] = None recipient_email: Optional[str] = None if payload is not None: raw_keys = payload.get("keys") if raw_keys is not None: if not isinstance(raw_keys, list): raise HTTPException(status_code=400, detail="keys must be an array of diagnostic keys") keys = [] for raw_key in raw_keys: if not isinstance(raw_key, str): raise HTTPException(status_code=400, detail="Each diagnostic key must be a string") normalized = raw_key.strip() if normalized: keys.append(normalized) raw_recipient_email = payload.get("recipient_email") if raw_recipient_email is not None: if not isinstance(raw_recipient_email, str): raise HTTPException(status_code=400, detail="recipient_email must be a string") recipient_email = raw_recipient_email.strip() or None return {"status": "ok", **(await run_diagnostics(keys, recipient_email=recipient_email))} @router.get("/sonarr/options") async def sonarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Sonarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/radarr/options") async def radarr_options() -> Dict[str, Any]: runtime = get_runtime_settings() client = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Radarr not configured") root_folders = await client.get_root_folders() profiles = await client.get_quality_profiles() return { "rootFolders": _normalize_root_folders(root_folders), "qualityProfiles": _normalize_quality_profiles(profiles), } @router.get("/jellyfin/users") async def jellyfin_users() -> Dict[str, Any]: cached = get_cached_jellyfin_users() if cached is not None: return {"users": cached} runtime = get_runtime_settings() client = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyfin not configured") users = await client.get_users() if not isinstance(users, list): return {"users": []} results = save_jellyfin_users_cache(users) return {"users": results} @router.post("/jellyfin/users/sync") async def jellyfin_users_sync() -> Dict[str, Any]: imported = await sync_jellyfin_users() return {"status": "ok", "imported": imported} async def _fetch_all_jellyseerr_users( client: JellyseerrClient, use_cache: bool = True ) -> List[Dict[str, Any]]: if use_cache: cached = get_cached_jellyseerr_users() if cached is not None: return cached users: List[Dict[str, Any]] = [] take = 100 skip = 0 while True: payload = await client.get_users(take=take, skip=skip) if not payload: break if isinstance(payload, list): batch = payload elif isinstance(payload, dict): batch = payload.get("results") or payload.get("users") or payload.get("data") or payload.get("items") else: batch = None if not isinstance(batch, list) or not batch: break users.extend([user for user in batch if isinstance(user, dict)]) if len(batch) < take: break skip += take if users: return save_jellyseerr_users_cache(users) return users @router.post("/seerr/users/sync") @router.post("/jellyseerr/users/sync") async def jellyseerr_users_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Seerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client, use_cache=False) if not jellyseerr_users: return {"status": "ok", "matched": 0, "skipped": 0, "total": 0} candidate_to_id = build_jellyseerr_candidate_map(jellyseerr_users) updated = 0 skipped = 0 users = get_all_users() for user in users: if user.get("jellyseerr_user_id") is not None: skipped += 1 continue username = user.get("username") or "" matched_id = match_jellyseerr_user_id(username, candidate_to_id) if matched_id is not None: set_user_jellyseerr_id(username, matched_id) updated += 1 else: skipped += 1 return {"status": "ok", "matched": updated, "skipped": skipped, "total": len(users)} def _pick_jellyseerr_username(user: Dict[str, Any]) -> Optional[str]: for key in ("email", "username", "displayName", "name"): value = user.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None @router.post("/seerr/users/resync") @router.post("/jellyseerr/users/resync") async def jellyseerr_users_resync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Seerr not configured") jellyseerr_users = await _fetch_all_jellyseerr_users(client, use_cache=False) if not jellyseerr_users: return {"status": "ok", "imported": 0, "cleared": 0} cleared = delete_non_admin_users() imported = 0 for user in jellyseerr_users: user_id = user.get("id") or user.get("userId") or user.get("Id") try: user_id = int(user_id) except (TypeError, ValueError): continue username = _pick_jellyseerr_username(user) if not username: continue created = create_user_if_missing( username, "jellyseerr-user", role="user", auth_provider="jellyseerr", jellyseerr_user_id=user_id, ) if created: imported += 1 else: set_user_jellyseerr_id(username, user_id) return {"status": "ok", "imported": imported, "cleared": cleared} @router.post("/requests/sync") async def requests_sync() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Seerr not configured") state = await requests_router.start_requests_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/sync/delta") async def requests_sync_delta() -> Dict[str, Any]: runtime = get_runtime_settings() client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Seerr not configured") state = await requests_router.start_requests_delta_sync( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key ) logger.info("Admin triggered delta requests sync: status=%s", state.get("status")) return {"status": "ok", "sync": state} @router.post("/requests/artwork/prefetch") async def requests_artwork_prefetch(only_missing: bool = False) -> Dict[str, Any]: runtime = get_runtime_settings() state = await requests_router.start_artwork_prefetch( runtime.jellyseerr_base_url, runtime.jellyseerr_api_key, only_missing=only_missing, ) logger.info("Admin triggered artwork prefetch: status=%s", state.get("status")) return {"status": "ok", "prefetch": state} @router.get("/requests/artwork/status") async def requests_artwork_status() -> Dict[str, Any]: return {"status": "ok", "prefetch": requests_router.get_artwork_prefetch_state()} @router.get("/requests/artwork/summary") async def requests_artwork_summary() -> Dict[str, Any]: runtime = get_runtime_settings() cache_mode = (runtime.artwork_cache_mode or "remote").lower() stats = get_request_cache_stats() if cache_mode != "cache": stats["cache_bytes"] = 0 stats["cache_files"] = 0 stats["missing_artwork"] = 0 summary = { "cache_mode": cache_mode, "cache_bytes": stats.get("cache_bytes", 0), "cache_files": stats.get("cache_files", 0), "missing_artwork": stats.get("missing_artwork", 0), "total_requests": stats.get("total_requests", 0), "updated_at": stats.get("updated_at"), } return {"status": "ok", "summary": summary} @router.get("/requests/sync/status") async def requests_sync_status() -> Dict[str, Any]: return {"status": "ok", "sync": requests_router.get_requests_sync_state()} @events_router.get("/stream") async def admin_events_stream( request: Request, include_logs: bool = False, log_lines: int = 200, _: Dict[str, Any] = Depends(require_admin_event_stream), ) -> StreamingResponse: async def event_generator(): # Advise client reconnect timing once per stream. yield "retry: 2000\n\n" last_snapshot: Optional[str] = None heartbeat_counter = 0 log_refresh_counter = 5 if include_logs else 0 latest_logs_payload: Optional[Dict[str, Any]] = None while True: if await request.is_disconnected(): break snapshot_payload = _admin_live_state_snapshot() if include_logs: log_refresh_counter += 1 if log_refresh_counter >= 5: log_refresh_counter = 0 try: latest_logs_payload = { "lines": _read_log_tail_lines(log_lines), "count": max(1, min(int(log_lines or 200), 1000)), } except HTTPException as exc: latest_logs_payload = { "error": str(exc.detail) if exc.detail else "Could not read logs", } except Exception as exc: latest_logs_payload = {"error": str(exc)} snapshot_payload["logs"] = latest_logs_payload snapshot = _sse_encode(snapshot_payload) if snapshot != last_snapshot: last_snapshot = snapshot yield snapshot heartbeat_counter = 0 else: heartbeat_counter += 1 # Keep the stream alive through proxies even when state is unchanged. if heartbeat_counter >= 15: yield ": ping\n\n" heartbeat_counter = 0 await asyncio.sleep(1.0) headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers) @router.get("/logs") async def read_logs(lines: int = 200) -> Dict[str, Any]: return {"lines": _read_log_tail_lines(lines)} @router.get("/requests/cache") async def requests_cache(limit: int = 50) -> Dict[str, Any]: repaired = repair_request_cache_titles() if repaired: logger.info("Requests cache titles repaired via settings view: %s", repaired) hydrated = await _hydrate_cache_titles_from_jellyseerr(limit) if hydrated: logger.info("Requests cache titles hydrated via Seerr: %s", hydrated) rows = get_request_cache_overview(limit) return {"rows": rows} @router.get("/requests/all") async def requests_all( take: int = 50, skip: int = 0, days: Optional[int] = None, stage: str = "all", user: Dict[str, str] = Depends(get_current_user), ) -> Dict[str, Any]: if user.get("role") != "admin": raise HTTPException(status_code=403, detail="Forbidden") take = max(1, min(int(take or 50), 200)) skip = max(0, int(skip or 0)) since_iso = None if days is not None and int(days) > 0: since_iso = (datetime.now(timezone.utc) - timedelta(days=int(days))).isoformat() status_codes = requests_router.request_stage_filter_codes(stage) rows = get_cached_requests(limit=take, offset=skip, since_iso=since_iso, status_codes=status_codes) total = get_cached_requests_count(since_iso=since_iso, status_codes=status_codes) results = [] for row in rows: status = row.get("status") results.append( { "id": row.get("request_id"), "title": row.get("title"), "year": row.get("year"), "type": row.get("media_type"), "status": status, "statusLabel": requests_router._status_label(status), "requestedBy": row.get("requested_by"), "createdAt": row.get("created_at"), } ) return {"results": results, "total": total, "take": take, "skip": skip} @router.post("/branding/logo") async def upload_branding_logo(file: UploadFile = File(...)) -> Dict[str, Any]: return await save_branding_image(file) @router.post("/maintenance/repair") async def repair_database() -> Dict[str, Any]: result = run_integrity_check() vacuum_db() logger.info("Database repair executed: integrity_check=%s", result) return {"status": "ok", "integrity": result} @router.post("/maintenance/flush") async def flush_database() -> Dict[str, Any]: cleared = clear_requests_cache() history = clear_history() user_objects = clear_user_objects_nuclear() user_caches = clear_user_import_caches() delete_setting("requests_sync_last_at") logger.warning( "Database flush executed: requests_cache=%s history=%s user_objects=%s user_caches=%s", cleared, history, user_objects, user_caches, ) return { "status": "ok", "requestsCleared": cleared, "historyCleared": history, "userObjectsCleared": user_objects, "userCachesCleared": user_caches, } @router.post("/maintenance/cleanup") async def cleanup_database(days: int = 90) -> Dict[str, Any]: result = cleanup_history(days) logger.info("Database cleanup executed: days=%s result=%s", days, result) return {"status": "ok", "days": days, "cleared": result} @router.post("/maintenance/logs/clear") async def clear_logs() -> Dict[str, Any]: runtime = get_runtime_settings() log_file = runtime.log_file if not log_file: raise HTTPException(status_code=400, detail="Log file not configured") if not os.path.isabs(log_file): log_file = os.path.join(os.getcwd(), log_file) try: os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, "w", encoding="utf-8"): pass except OSError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc logger.info("Log file cleared") return {"status": "ok"} @router.get("/users") async def list_users() -> Dict[str, Any]: users = get_all_users() return {"users": users} @router.get("/users/summary") async def list_users_summary() -> Dict[str, Any]: users = get_all_users() results: list[Dict[str, Any]] = [] for user in users: username = user.get("username") or "" username_norm = _normalize_username(username) if username else "" stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) results.append({**user, "stats": stats}) return {"users": results} @router.get("/users/{username}") async def get_user_summary(username: str) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats, "lineage": _user_inviter_details(user)} @router.get("/users/id/{user_id}") async def get_user_summary_by_id(user_id: int) -> Dict[str, Any]: user = get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") username_norm = _normalize_username(user.get("username") or "") stats = get_user_request_stats(username_norm, user.get("jellyseerr_user_id")) return {"user": user, "stats": stats, "lineage": _user_inviter_details(user)} @router.post("/users/{username}/block") async def block_user(username: str) -> Dict[str, Any]: set_user_blocked(username, True) logger.warning("Admin blocked user: username=%s", username) return {"status": "ok", "username": username, "blocked": True} @router.post("/users/{username}/unblock") async def unblock_user(username: str) -> Dict[str, Any]: set_user_blocked(username, False) logger.info("Admin unblocked user: username=%s", username) return {"status": "ok", "username": username, "blocked": False} @router.post("/users/{username}/system-action") async def user_system_action(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") action = str(payload.get("action") or "").strip().lower() if action not in {"ban", "unban", "remove"}: raise HTTPException(status_code=400, detail="action must be ban, unban, or remove") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if user.get("role") == "admin": raise HTTPException(status_code=400, detail="Cross-system actions are not allowed for admin users") runtime = get_runtime_settings() jellyfin = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key) jellyseerr = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key) result: Dict[str, Any] = { "status": "ok", "action": action, "username": user.get("username"), "local": {"status": "pending"}, "jellyfin": {"status": "skipped", "detail": "Jellyfin not configured"}, "jellyseerr": {"status": "skipped", "detail": "Seerr not configured or no linked user ID"}, "invites": {"status": "pending", "disabled": 0}, "email": {"status": "skipped", "detail": "No email action required"}, } if action == "ban": set_user_blocked(username, True) result["local"] = {"status": "ok", "blocked": True} elif action == "unban": set_user_blocked(username, False) result["local"] = {"status": "ok", "blocked": False} else: result["local"] = {"status": "pending-delete"} if action in {"ban", "remove"}: result["invites"] = {"status": "ok", "disabled": disable_signup_invites_by_creator(username)} else: result["invites"] = {"status": "ok", "disabled": 0} if action in {"ban", "remove"}: try: invite = _resolve_user_invite(user) email_result = await send_templated_email( "banned", invite=invite, user=user, reason="Account banned" if action == "ban" else "Account removed", ) result["email"] = {"status": "ok", **email_result} except Exception as exc: result["email"] = {"status": "error", "detail": str(exc)} if jellyfin.configured(): try: jellyfin_user = await jellyfin.find_user_by_name(username) if not jellyfin_user: result["jellyfin"] = {"status": "not_found"} else: jellyfin_user_id = jellyfin._extract_user_id(jellyfin_user) # type: ignore[attr-defined] if not jellyfin_user_id: raise RuntimeError("Could not determine Jellyfin user ID") if action == "ban": await jellyfin.set_user_disabled(jellyfin_user_id, True) result["jellyfin"] = {"status": "ok", "action": "disabled", "user_id": jellyfin_user_id} elif action == "unban": await jellyfin.set_user_disabled(jellyfin_user_id, False) result["jellyfin"] = {"status": "ok", "action": "enabled", "user_id": jellyfin_user_id} else: await jellyfin.delete_user(jellyfin_user_id) result["jellyfin"] = {"status": "ok", "action": "deleted", "user_id": jellyfin_user_id} except Exception as exc: result["jellyfin"] = {"status": "error", "detail": _http_error_detail(exc)} jellyseerr_user_id = user.get("jellyseerr_user_id") if jellyseerr.configured() and jellyseerr_user_id is not None: try: if action == "remove": await jellyseerr.delete_user(int(jellyseerr_user_id)) result["jellyseerr"] = {"status": "ok", "action": "deleted", "user_id": int(jellyseerr_user_id)} elif action == "ban": result["jellyseerr"] = {"status": "ok", "action": "delegated-to-jellyfin-disable", "user_id": int(jellyseerr_user_id)} else: result["jellyseerr"] = {"status": "ok", "action": "delegated-to-jellyfin-enable", "user_id": int(jellyseerr_user_id)} except Exception as exc: result["jellyseerr"] = {"status": "error", "detail": _http_error_detail(exc)} if action == "remove": deleted = delete_user_by_username(username) activity_deleted = delete_user_activity_by_username(username) result["local"] = { "status": "ok" if deleted else "not_found", "deleted": bool(deleted), "activity_deleted": activity_deleted, } if any( isinstance(system, dict) and system.get("status") == "error" for system in (result.get("jellyfin"), result.get("jellyseerr"), result.get("email")) ): result["status"] = "partial" logger.info( "Admin system action completed: username=%s action=%s overall=%s local=%s jellyfin=%s jellyseerr=%s invites=%s email=%s", username, action, result.get("status"), result.get("local", {}).get("status"), result.get("jellyfin", {}).get("status"), result.get("jellyseerr", {}).get("status"), result.get("invites", {}).get("status"), result.get("email", {}).get("status"), ) return result @router.post("/users/{username}/role") async def update_user_role(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: role = payload.get("role") if role not in {"admin", "user"}: raise HTTPException(status_code=400, detail="Invalid role") set_user_role(username, role) return {"status": "ok", "username": username, "role": role} @router.post("/users/{username}/auto-search") async def update_user_auto_search(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: enabled = payload.get("enabled") if isinstance(payload, dict) else None if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") set_user_auto_search_enabled(username, enabled) return {"status": "ok", "username": username, "auto_search_enabled": enabled} @router.post("/users/{username}/invite-access") async def update_user_invite_access(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: enabled = payload.get("enabled") if isinstance(payload, dict) else None if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") set_user_invite_management_enabled(username, enabled) refreshed = get_user_by_username(username) return { "status": "ok", "username": username, "invite_management_enabled": bool(refreshed.get("invite_management_enabled", enabled)) if refreshed else enabled, "user": refreshed, } @router.post("/users/{username}/profile") async def update_user_profile_assignment(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") profile_id = payload.get("profile_id") if profile_id in (None, ""): set_user_profile_id(username, None) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} try: parsed_profile_id = int(profile_id) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="profile_id must be a number") from exc profile = get_user_profile(parsed_profile_id) if not profile: raise HTTPException(status_code=404, detail="Profile not found") if not profile.get("is_active", True): raise HTTPException(status_code=400, detail="Profile is disabled") refreshed = _apply_profile_defaults_to_user(username, profile) return {"status": "ok", "user": refreshed, "applied_profile_id": parsed_profile_id} @router.post("/users/{username}/expiry") async def update_user_expiry(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") clear = payload.get("clear") if clear is True: set_user_expires_at(username, None) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} if "days" in payload and payload.get("days") not in (None, ""): days = _parse_optional_positive_int(payload.get("days"), "days") expires_at = None if days is not None: expires_at = (datetime.now(timezone.utc) + timedelta(days=days)).isoformat() set_user_expires_at(username, expires_at) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} expires_at = _parse_optional_expires_at(payload.get("expires_at")) set_user_expires_at(username, expires_at) refreshed = get_user_by_username(username) return {"status": "ok", "user": refreshed} @router.post("/users/auto-search/bulk") async def update_users_auto_search_bulk(payload: Dict[str, Any]) -> Dict[str, Any]: enabled = payload.get("enabled") if isinstance(payload, dict) else None if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") updated = set_auto_search_enabled_for_non_admin_users(enabled) return { "status": "ok", "enabled": enabled, "updated": updated, "scope": "non-admin-users", } @router.post("/users/invite-access/bulk") async def update_users_invite_access_bulk(payload: Dict[str, Any]) -> Dict[str, Any]: enabled = payload.get("enabled") if isinstance(payload, dict) else None if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") updated = set_invite_management_enabled_for_non_admin_users(enabled) return { "status": "ok", "enabled": enabled, "updated": updated, "scope": "non-admin-users", } @router.post("/users/profile/bulk") async def update_users_profile_bulk(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") scope = str(payload.get("scope") or "non-admin-users").strip().lower() if scope not in {"non-admin-users", "all-users"}: raise HTTPException(status_code=400, detail="Invalid scope") profile_id_value = payload.get("profile_id") if profile_id_value in (None, ""): users = get_all_users() updated = 0 for user in users: if scope == "non-admin-users" and user.get("role") == "admin": continue set_user_profile_id(user["username"], None) updated += 1 return {"status": "ok", "updated": updated, "scope": scope, "profile_id": None} try: profile_id = int(profile_id_value) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="profile_id must be a number") from exc profile = get_user_profile(profile_id) if not profile: raise HTTPException(status_code=404, detail="Profile not found") if not profile.get("is_active", True): raise HTTPException(status_code=400, detail="Profile is disabled") users = get_all_users() updated = 0 for user in users: if scope == "non-admin-users" and user.get("role") == "admin": continue _apply_profile_defaults_to_user(user["username"], profile) updated += 1 return {"status": "ok", "updated": updated, "scope": scope, "profile_id": profile_id} @router.post("/users/expiry/bulk") async def update_users_expiry_bulk(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") scope = str(payload.get("scope") or "non-admin-users").strip().lower() if scope not in {"non-admin-users", "all-users"}: raise HTTPException(status_code=400, detail="Invalid scope") clear = payload.get("clear") expires_at: Optional[str] = None if clear is True: expires_at = None elif "days" in payload and payload.get("days") not in (None, ""): days = _parse_optional_positive_int(payload.get("days"), "days") expires_at = (datetime.now(timezone.utc) + timedelta(days=int(days or 0))).isoformat() if days else None else: expires_at = _parse_optional_expires_at(payload.get("expires_at")) users = get_all_users() updated = 0 for user in users: if scope == "non-admin-users" and user.get("role") == "admin": continue set_user_expires_at(user["username"], expires_at) updated += 1 return {"status": "ok", "updated": updated, "scope": scope, "expires_at": expires_at} @router.post("/users/{username}/password") async def update_user_password(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: new_password = payload.get("password") if isinstance(payload, dict) else None if not isinstance(new_password, str) or len(new_password.strip()) < 8: raise HTTPException(status_code=400, detail="Password must be at least 8 characters.") user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") new_password_clean = new_password.strip() user = normalize_user_auth_provider(user) auth_provider = resolve_user_auth_provider(user) if auth_provider == "local": set_user_password(username, new_password_clean) return {"status": "ok", "username": username, "provider": "local"} if auth_provider == "jellyfin": runtime = get_runtime_settings() client = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key) if not client.configured(): raise HTTPException(status_code=400, detail="Jellyfin not configured for password passthrough.") try: jf_user = await client.find_user_by_name(username) user_id = client._extract_user_id(jf_user) if not user_id: raise RuntimeError("Jellyfin user ID not found") await client.set_user_password(user_id, new_password_clean) except Exception as exc: raise HTTPException(status_code=502, detail=f"Jellyfin password update failed: {exc}") from exc sync_jellyfin_password_state(username, new_password_clean) return {"status": "ok", "username": username, "provider": "jellyfin"} raise HTTPException( status_code=400, detail="Password changes are not available for this sign-in provider.", ) @router.get("/profiles") async def get_profiles() -> Dict[str, Any]: profiles = list_user_profiles() users = get_all_users() invites = list_signup_invites() user_counts: Dict[int, int] = {} invite_counts: Dict[int, int] = {} for user in users: profile_id = user.get("profile_id") if isinstance(profile_id, int): user_counts[profile_id] = user_counts.get(profile_id, 0) + 1 for invite in invites: profile_id = invite.get("profile_id") if isinstance(profile_id, int): invite_counts[profile_id] = invite_counts.get(profile_id, 0) + 1 enriched = [] for profile in profiles: pid = int(profile["id"]) enriched.append( { **profile, "assigned_users": user_counts.get(pid, 0), "assigned_invites": invite_counts.get(pid, 0), } ) return {"profiles": enriched} @router.post("/profiles") async def create_profile(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") name = _normalize_optional_text(payload.get("name")) if not name: raise HTTPException(status_code=400, detail="Profile name is required") role = _normalize_role_or_none(payload.get("role")) or "user" auto_search_enabled = payload.get("auto_search_enabled") if auto_search_enabled is None: auto_search_enabled = True if not isinstance(auto_search_enabled, bool): raise HTTPException(status_code=400, detail="auto_search_enabled must be true or false") is_active = payload.get("is_active") if is_active is None: is_active = True if not isinstance(is_active, bool): raise HTTPException(status_code=400, detail="is_active must be true or false") account_expires_days = _parse_optional_positive_int( payload.get("account_expires_days"), "account_expires_days" ) try: profile = create_user_profile( name=name, description=_normalize_optional_text(payload.get("description")), role=role, auto_search_enabled=auto_search_enabled, account_expires_days=account_expires_days, is_active=is_active, ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="A profile with that name already exists") from exc logger.info( "Admin created profile: profile_id=%s name=%s role=%s active=%s auto_search=%s expires_days=%s", profile.get("id"), profile.get("name"), profile.get("role"), profile.get("is_active"), profile.get("auto_search_enabled"), profile.get("account_expires_days"), ) return {"status": "ok", "profile": profile} @router.put("/profiles/{profile_id}") async def edit_profile(profile_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") existing = get_user_profile(profile_id) if not existing: raise HTTPException(status_code=404, detail="Profile not found") name = _normalize_optional_text(payload.get("name")) if not name: raise HTTPException(status_code=400, detail="Profile name is required") role = _normalize_role_or_none(payload.get("role")) or "user" auto_search_enabled = payload.get("auto_search_enabled") if not isinstance(auto_search_enabled, bool): raise HTTPException(status_code=400, detail="auto_search_enabled must be true or false") is_active = payload.get("is_active") if not isinstance(is_active, bool): raise HTTPException(status_code=400, detail="is_active must be true or false") account_expires_days = _parse_optional_positive_int( payload.get("account_expires_days"), "account_expires_days" ) try: profile = update_user_profile( profile_id, name=name, description=_normalize_optional_text(payload.get("description")), role=role, auto_search_enabled=auto_search_enabled, account_expires_days=account_expires_days, is_active=is_active, ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="A profile with that name already exists") from exc if not profile: raise HTTPException(status_code=404, detail="Profile not found") logger.info( "Admin updated profile: profile_id=%s name=%s role=%s active=%s auto_search=%s expires_days=%s", profile.get("id"), profile.get("name"), profile.get("role"), profile.get("is_active"), profile.get("auto_search_enabled"), profile.get("account_expires_days"), ) return {"status": "ok", "profile": profile} @router.delete("/profiles/{profile_id}") async def remove_profile(profile_id: int) -> Dict[str, Any]: try: deleted = delete_user_profile(profile_id) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc if not deleted: raise HTTPException(status_code=404, detail="Profile not found") logger.warning("Admin deleted profile: profile_id=%s", profile_id) return {"status": "ok", "deleted": True, "profile_id": profile_id} @router.get("/invites") async def get_invites() -> Dict[str, Any]: invites = list_signup_invites() profiles = {profile["id"]: profile for profile in list_user_profiles()} results = [] for invite in invites: profile = profiles.get(invite.get("profile_id")) results.append( { **invite, "profile": ( { "id": profile.get("id"), "name": profile.get("name"), } if profile else None ), } ) return {"invites": results} @router.get("/invites/policy") async def get_invite_policy() -> Dict[str, Any]: users = get_all_users() non_admin_users = [user for user in users if user.get("role") != "admin"] invite_access_enabled_count = sum( 1 for user in non_admin_users if bool(user.get("invite_management_enabled", False)) ) raw_master_invite_id = get_setting(SELF_SERVICE_INVITE_MASTER_ID_KEY) master_invite_id: Optional[int] = None master_invite: Optional[Dict[str, Any]] = None if raw_master_invite_id not in (None, ""): try: candidate = int(str(raw_master_invite_id).strip()) if candidate > 0: master_invite_id = candidate master_invite = get_signup_invite_by_id(candidate) except (TypeError, ValueError): master_invite_id = None master_invite = None return { "status": "ok", "policy": { "master_invite_id": master_invite_id if master_invite is not None else None, "master_invite": master_invite, "non_admin_users": len(non_admin_users), "invite_access_enabled_users": invite_access_enabled_count, }, } @router.post("/invites/policy") async def update_invite_policy(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") master_invite_value = payload.get("master_invite_id") if master_invite_value in (None, "", 0, "0"): set_setting(SELF_SERVICE_INVITE_MASTER_ID_KEY, None) logger.info("Admin cleared invite policy master invite") return {"status": "ok", "policy": {"master_invite_id": None, "master_invite": None}} try: master_invite_id = int(master_invite_value) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="master_invite_id must be a number") from exc if master_invite_id <= 0: raise HTTPException(status_code=400, detail="master_invite_id must be a positive number") invite = get_signup_invite_by_id(master_invite_id) if not invite: raise HTTPException(status_code=404, detail="Master invite not found") set_setting(SELF_SERVICE_INVITE_MASTER_ID_KEY, str(master_invite_id)) logger.info("Admin updated invite policy: master_invite_id=%s", master_invite_id) return { "status": "ok", "policy": { "master_invite_id": master_invite_id, "master_invite": invite, }, } @router.get("/invites/email/templates") async def get_invite_email_template_settings() -> Dict[str, Any]: ready, detail = smtp_email_config_ready() warning = smtp_email_delivery_warning() return { "status": "ok", "email": { "configured": ready, "detail": warning or detail, }, "templates": list(get_invite_email_templates().values()), } @router.put("/invites/email/templates/{template_key}") async def update_invite_email_template_settings(template_key: str, payload: Dict[str, Any]) -> Dict[str, Any]: if template_key not in INVITE_EMAIL_TEMPLATE_KEYS: raise HTTPException(status_code=404, detail="Email template not found") if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") subject = _normalize_optional_text(payload.get("subject")) body_text = _normalize_optional_text(payload.get("body_text")) body_html = _normalize_optional_text(payload.get("body_html")) if not subject: raise HTTPException(status_code=400, detail="subject is required") if not body_text and not body_html: raise HTTPException(status_code=400, detail="At least one email body is required") template = save_invite_email_template( template_key, subject=subject, body_text=body_text or "", body_html=body_html or "", ) logger.info("Admin updated invite email template: template=%s", template_key) return {"status": "ok", "template": template} @router.delete("/invites/email/templates/{template_key}") async def reset_invite_email_template_settings(template_key: str) -> Dict[str, Any]: if template_key not in INVITE_EMAIL_TEMPLATE_KEYS: raise HTTPException(status_code=404, detail="Email template not found") template = reset_invite_email_template(template_key) logger.info("Admin reset invite email template: template=%s", template_key) return {"status": "ok", "template": template} @router.post("/invites/email/send") async def send_invite_email(payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") template_key = str(payload.get("template_key") or "").strip().lower() if template_key not in INVITE_EMAIL_TEMPLATE_KEYS: raise HTTPException(status_code=400, detail="template_key is invalid") invite: Optional[Dict[str, Any]] = None invite_id = payload.get("invite_id") if invite_id not in (None, ""): try: invite = get_signup_invite_by_id(int(invite_id)) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail="invite_id must be a number") from exc if not invite: raise HTTPException(status_code=404, detail="Invite not found") user: Optional[Dict[str, Any]] = None username = _normalize_optional_text(payload.get("username")) if username: user = get_user_by_username(username) if not user: raise HTTPException(status_code=404, detail="User not found") if invite is None: invite = _resolve_user_invite(user) recipient_email = _normalize_optional_text(payload.get("recipient_email")) message = _normalize_optional_text(payload.get("message")) reason = _normalize_optional_text(payload.get("reason")) try: result = await send_templated_email( template_key, invite=invite, user=user, recipient_email=recipient_email, message=message, reason=reason, ) except Exception as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc logger.info( "Admin sent invite email template: template=%s recipient=%s invite_id=%s username=%s", template_key, result.get("recipient_email"), invite.get("id") if invite else None, user.get("username") if user else None, ) return { "status": "ok", "template_key": template_key, **result, } @router.get("/invites/trace") async def get_invite_trace() -> Dict[str, Any]: return {"status": "ok", "trace": _build_invite_trace_payload()} @router.post("/invites") async def create_invite(payload: Dict[str, Any], current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") raw_code = _normalize_optional_text(payload.get("code")) code = _normalize_invite_code(raw_code) if raw_code else _generate_invite_code() profile_id = _parse_optional_profile_id(payload.get("profile_id")) enabled = payload.get("enabled") if enabled is None: enabled = True if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") role = _normalize_role_or_none(payload.get("role")) max_uses = _parse_optional_positive_int(payload.get("max_uses"), "max_uses") expires_at = _parse_optional_expires_at(payload.get("expires_at")) recipient_email = _normalize_optional_text(payload.get("recipient_email")) send_email = bool(payload.get("send_email")) delivery_message = _normalize_optional_text(payload.get("message")) try: invite = create_signup_invite( code=code, label=_normalize_optional_text(payload.get("label")), description=_normalize_optional_text(payload.get("description")), profile_id=profile_id, role=role, max_uses=max_uses, enabled=enabled, expires_at=expires_at, recipient_email=recipient_email, created_by=current_user.get("username"), ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="An invite with that code already exists") from exc email_result = None email_error = None if send_email: try: email_result = await send_templated_email( "invited", invite=invite, user=current_user, recipient_email=recipient_email, message=delivery_message, ) except Exception as exc: email_error = str(exc) logger.info( "Admin created invite: invite_id=%s code=%s label=%s profile_id=%s role=%s max_uses=%s enabled=%s recipient_email=%s send_email=%s", invite.get("id"), invite.get("code"), invite.get("label"), invite.get("profile_id"), invite.get("role"), invite.get("max_uses"), invite.get("enabled"), invite.get("recipient_email"), send_email, ) return { "status": "partial" if email_error else "ok", "invite": invite, "email": ( {"status": "ok", **email_result} if email_result else {"status": "error", "detail": email_error} if email_error else None ), } @router.put("/invites/{invite_id}") async def edit_invite(invite_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): raise HTTPException(status_code=400, detail="Invalid payload") existing = get_signup_invite_by_id(invite_id) if not existing: raise HTTPException(status_code=404, detail="Invite not found") code = _normalize_invite_code(_normalize_optional_text(payload.get("code")) or existing["code"]) profile_id = _parse_optional_profile_id(payload.get("profile_id")) enabled = payload.get("enabled") if not isinstance(enabled, bool): raise HTTPException(status_code=400, detail="enabled must be true or false") role = _normalize_role_or_none(payload.get("role")) max_uses = _parse_optional_positive_int(payload.get("max_uses"), "max_uses") expires_at = _parse_optional_expires_at(payload.get("expires_at")) recipient_email = _normalize_optional_text(payload.get("recipient_email")) send_email = bool(payload.get("send_email")) delivery_message = _normalize_optional_text(payload.get("message")) try: invite = update_signup_invite( invite_id, code=code, label=_normalize_optional_text(payload.get("label")), description=_normalize_optional_text(payload.get("description")), profile_id=profile_id, role=role, max_uses=max_uses, enabled=enabled, expires_at=expires_at, recipient_email=recipient_email, ) except sqlite3.IntegrityError as exc: raise HTTPException(status_code=409, detail="An invite with that code already exists") from exc if not invite: raise HTTPException(status_code=404, detail="Invite not found") email_result = None email_error = None if send_email: try: email_result = await send_templated_email( "invited", invite=invite, recipient_email=recipient_email, message=delivery_message, ) except Exception as exc: email_error = str(exc) logger.info( "Admin updated invite: invite_id=%s code=%s label=%s profile_id=%s role=%s max_uses=%s enabled=%s recipient_email=%s send_email=%s", invite.get("id"), invite.get("code"), invite.get("label"), invite.get("profile_id"), invite.get("role"), invite.get("max_uses"), invite.get("enabled"), invite.get("recipient_email"), send_email, ) return { "status": "partial" if email_error else "ok", "invite": invite, "email": ( {"status": "ok", **email_result} if email_result else {"status": "error", "detail": email_error} if email_error else None ), } @router.delete("/invites/{invite_id}") async def remove_invite(invite_id: int) -> Dict[str, Any]: deleted = delete_signup_invite(invite_id) if not deleted: raise HTTPException(status_code=404, detail="Invite not found") logger.warning("Admin deleted invite: invite_id=%s", invite_id) return {"status": "ok", "deleted": True, "invite_id": invite_id}