import json import os import sqlite3 import logging from hashlib import sha256 from datetime import datetime, timezone, timedelta from time import perf_counter from typing import Any, Dict, Optional from .config import settings from .models import Snapshot from .security import hash_password, verify_password logger = logging.getLogger(__name__) SEERR_MEDIA_FAILURE_SHORT_SUPPRESS_HOURS = 6 SEERR_MEDIA_FAILURE_RETRY_SUPPRESS_HOURS = 24 SEERR_MEDIA_FAILURE_PERSISTENT_SUPPRESS_DAYS = 30 SEERR_MEDIA_FAILURE_PERSISTENT_THRESHOLD = 3 SQLITE_BUSY_TIMEOUT_MS = 5_000 SQLITE_CACHE_SIZE_KIB = 32_768 SQLITE_MMAP_SIZE_BYTES = 256 * 1024 * 1024 def _db_path() -> str: path = settings.sqlite_path or "data/magent.db" if not os.path.isabs(path): app_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) path = os.path.join(app_root, path) os.makedirs(os.path.dirname(path), exist_ok=True) return path def _apply_connection_pragmas(conn: sqlite3.Connection) -> None: pragmas = ( ("journal_mode", "WAL"), ("synchronous", "NORMAL"), ("temp_store", "MEMORY"), ("cache_size", -SQLITE_CACHE_SIZE_KIB), ("mmap_size", SQLITE_MMAP_SIZE_BYTES), ("busy_timeout", SQLITE_BUSY_TIMEOUT_MS), ) for pragma, value in pragmas: try: conn.execute(f"PRAGMA {pragma} = {value}") except sqlite3.DatabaseError: logger.debug("sqlite pragma skipped: %s=%s", pragma, value, exc_info=True) def _connect() -> sqlite3.Connection: conn = sqlite3.connect( _db_path(), timeout=SQLITE_BUSY_TIMEOUT_MS / 1000, cached_statements=512, ) _apply_connection_pragmas(conn) return conn def _parse_datetime_value(value: Optional[str]) -> Optional[datetime]: if not isinstance(value, str) or not value.strip(): return None candidate = value.strip() if candidate.endswith("Z"): candidate = candidate[:-1] + "+00:00" try: parsed = datetime.fromisoformat(candidate) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed def _is_datetime_in_past(value: Optional[str]) -> bool: parsed = _parse_datetime_value(value) if parsed is None: return False return parsed <= datetime.now(timezone.utc) def _normalize_title_value(title: Optional[str]) -> Optional[str]: if not isinstance(title, str): return None trimmed = title.strip() return trimmed if trimmed else None def _normalize_year_value(year: Optional[Any]) -> Optional[int]: if isinstance(year, int): return year if isinstance(year, str): trimmed = year.strip() if trimmed.isdigit(): return int(trimmed) return None def _is_placeholder_title(title: Optional[str], request_id: Optional[int]) -> bool: if not isinstance(title, str): return True normalized = title.strip().lower() if not normalized: return True if normalized == "untitled": return True if request_id and normalized == f"request {request_id}": return True return False def _extract_title_year_from_payload(payload_json: Optional[str]) -> tuple[Optional[str], Optional[int]]: if not payload_json: return None, None try: payload = json.loads(payload_json) except json.JSONDecodeError: return None, None if not isinstance(payload, dict): return None, None media = payload.get("media") or {} title = None year = None if isinstance(media, dict): title = media.get("title") or media.get("name") year = media.get("year") if not title: title = payload.get("title") or payload.get("name") if year is None: year = payload.get("year") return _normalize_title_value(title), _normalize_year_value(year) def _extract_tmdb_from_payload(payload_json: Optional[str]) -> tuple[Optional[int], Optional[str]]: if not payload_json: return None, None try: payload = json.loads(payload_json) except (TypeError, json.JSONDecodeError): return None, None if not isinstance(payload, dict): return None, None media = payload.get("media") or {} if not isinstance(media, dict): media = {} tmdb_id = ( media.get("tmdbId") or payload.get("tmdbId") or payload.get("tmdb_id") or media.get("externalServiceId") or payload.get("externalServiceId") ) media_type = ( media.get("mediaType") or payload.get("mediaType") or payload.get("media_type") or payload.get("type") ) try: tmdb_id = int(tmdb_id) if tmdb_id is not None else None except (TypeError, ValueError): tmdb_id = None if isinstance(media_type, str): media_type = media_type.strip().lower() or None return tmdb_id, media_type def init_db() -> None: with _connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT NOT NULL, state TEXT NOT NULL, state_reason TEXT, created_at TEXT NOT NULL, payload_json TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS actions ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT NOT NULL, action_id TEXT NOT NULL, label TEXT NOT NULL, status TEXT NOT NULL, message TEXT, created_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, role TEXT NOT NULL, auth_provider TEXT NOT NULL DEFAULT 'local', jellyseerr_user_id INTEGER, created_at TEXT NOT NULL, last_login_at TEXT, is_blocked INTEGER NOT NULL DEFAULT 0, auto_search_enabled INTEGER NOT NULL DEFAULT 1, invite_management_enabled INTEGER NOT NULL DEFAULT 0, profile_id INTEGER, expires_at TEXT, invited_by_code TEXT, invited_at TEXT, jellyfin_password_hash TEXT, last_jellyfin_auth_at TEXT ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS user_profiles ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, description TEXT, role TEXT NOT NULL DEFAULT 'user', auto_search_enabled INTEGER NOT NULL DEFAULT 1, account_expires_days INTEGER, is_active INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS signup_invites ( id INTEGER PRIMARY KEY AUTOINCREMENT, code TEXT NOT NULL UNIQUE, label TEXT, description TEXT, profile_id INTEGER, role TEXT, max_uses INTEGER, use_count INTEGER NOT NULL DEFAULT 0, enabled INTEGER NOT NULL DEFAULT 1, expires_at TEXT, recipient_email TEXT, created_by TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_signup_invites_enabled ON signup_invites (enabled) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_signup_invites_expires_at ON signup_invites (expires_at) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT, updated_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS requests_cache ( request_id INTEGER PRIMARY KEY, media_id INTEGER, media_type TEXT, status INTEGER, title TEXT, year INTEGER, requested_by TEXT, requested_by_norm TEXT, requested_by_id INTEGER, created_at TEXT, updated_at TEXT, payload_json TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS artwork_cache_status ( request_id INTEGER PRIMARY KEY, tmdb_id INTEGER, media_type TEXT, poster_path TEXT, backdrop_path TEXT, has_tmdb INTEGER NOT NULL DEFAULT 0, poster_cached INTEGER NOT NULL DEFAULT 0, backdrop_cached INTEGER NOT NULL DEFAULT 0, updated_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS seerr_media_failures ( media_type TEXT NOT NULL, tmdb_id INTEGER NOT NULL, status_code INTEGER, error_message TEXT, failure_count INTEGER NOT NULL DEFAULT 1, first_failed_at TEXT NOT NULL, last_failed_at TEXT NOT NULL, suppress_until TEXT NOT NULL, is_persistent INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (media_type, tmdb_id) ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS password_reset_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, token_hash TEXT NOT NULL UNIQUE, username TEXT NOT NULL, recipient_email TEXT NOT NULL, auth_provider TEXT NOT NULL, created_at TEXT NOT NULL, expires_at TEXT NOT NULL, used_at TEXT, requested_by_ip TEXT, requested_user_agent TEXT ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_created_at ON requests_cache (created_at) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_norm ON requests_cache (requested_by_norm) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_updated_at ON requests_cache (updated_at DESC, request_id DESC) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_id_created_at ON requests_cache (requested_by_id, created_at DESC, request_id DESC) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_norm_created_at ON requests_cache (requested_by_norm, created_at DESC, request_id DESC) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_status_created_at ON requests_cache (status, created_at DESC, request_id DESC) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_artwork_cache_status_updated_at ON artwork_cache_status (updated_at) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_seerr_media_failures_suppress_until ON seerr_media_failures (suppress_until) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_password_reset_tokens_username ON password_reset_tokens (username) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_password_reset_tokens_expires_at ON password_reset_tokens (expires_at) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS user_activity ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, ip TEXT NOT NULL, user_agent TEXT NOT NULL, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, hit_count INTEGER NOT NULL DEFAULT 1, UNIQUE(username, ip, user_agent) ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_user_activity_username ON user_activity (username) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_user_activity_last_seen ON user_activity (last_seen_at) """ ) try: conn.execute("ALTER TABLE users ADD COLUMN last_login_at TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN is_blocked INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN auth_provider TEXT NOT NULL DEFAULT 'local'") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN jellyfin_password_hash TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN last_jellyfin_auth_at TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN jellyseerr_user_id INTEGER") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN auto_search_enabled INTEGER NOT NULL DEFAULT 1") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN invite_management_enabled INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN profile_id INTEGER") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN expires_at TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN invited_by_code TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN invited_at TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE signup_invites ADD COLUMN recipient_email TEXT") except sqlite3.OperationalError: pass try: conn.execute( """ CREATE INDEX IF NOT EXISTS idx_users_profile_id ON users (profile_id) """ ) except sqlite3.OperationalError: pass try: conn.execute( """ CREATE INDEX IF NOT EXISTS idx_users_expires_at ON users (expires_at) """ ) except sqlite3.OperationalError: pass try: conn.execute( """ CREATE INDEX IF NOT EXISTS idx_users_username_nocase ON users (username COLLATE NOCASE) """ ) except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE requests_cache ADD COLUMN requested_by_id INTEGER") except sqlite3.OperationalError: pass try: conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_id ON requests_cache (requested_by_id) """ ) except sqlite3.OperationalError: pass try: conn.execute("PRAGMA optimize") except sqlite3.OperationalError: pass _backfill_auth_providers() ensure_admin_user() def save_snapshot(snapshot: Snapshot) -> None: payload = json.dumps(snapshot.model_dump(), ensure_ascii=True) created_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO snapshots (request_id, state, state_reason, created_at, payload_json) VALUES (?, ?, ?, ?, ?) """, ( snapshot.request_id, snapshot.state.value, snapshot.state_reason, created_at, payload, ), ) def save_action( request_id: str, action_id: str, label: str, status: str, message: Optional[str] = None, ) -> None: created_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO actions (request_id, action_id, label, status, message, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (request_id, action_id, label, status, message, created_at), ) def get_recent_snapshots(request_id: str, limit: int = 10) -> list[dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT request_id, state, state_reason, created_at, payload_json FROM snapshots WHERE request_id = ? ORDER BY id DESC LIMIT ? """, (request_id, limit), ).fetchall() results = [] for row in rows: results.append( { "request_id": row[0], "state": row[1], "state_reason": row[2], "created_at": row[3], "payload": json.loads(row[4]), } ) return results def get_recent_actions(request_id: str, limit: int = 10) -> list[dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT request_id, action_id, label, status, message, created_at FROM actions WHERE request_id = ? ORDER BY id DESC LIMIT ? """, (request_id, limit), ).fetchall() results = [] for row in rows: results.append( { "request_id": row[0], "action_id": row[1], "label": row[2], "status": row[3], "message": row[4], "created_at": row[5], } ) return results def ensure_admin_user() -> None: if not settings.admin_username or not settings.admin_password: return existing = get_user_by_username(settings.admin_username) if existing: return create_user(settings.admin_username, settings.admin_password, role="admin") def create_user( username: str, password: str, role: str = "user", auth_provider: str = "local", jellyseerr_user_id: Optional[int] = None, auto_search_enabled: bool = True, invite_management_enabled: bool = False, profile_id: Optional[int] = None, expires_at: Optional[str] = None, invited_by_code: Optional[str] = None, ) -> None: created_at = datetime.now(timezone.utc).isoformat() password_hash = hash_password(password) with _connect() as conn: conn.execute( """ INSERT INTO users ( username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, 1 if auto_search_enabled else 0, 1 if invite_management_enabled else 0, profile_id, expires_at, invited_by_code, created_at if invited_by_code else None, ), ) def create_user_if_missing( username: str, password: str, role: str = "user", auth_provider: str = "local", jellyseerr_user_id: Optional[int] = None, auto_search_enabled: bool = True, invite_management_enabled: bool = False, profile_id: Optional[int] = None, expires_at: Optional[str] = None, invited_by_code: Optional[str] = None, ) -> bool: created_at = datetime.now(timezone.utc).isoformat() password_hash = hash_password(password) with _connect() as conn: cursor = conn.execute( """ INSERT OR IGNORE INTO users ( username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, 1 if auto_search_enabled else 0, 1 if invite_management_enabled else 0, profile_id, expires_at, invited_by_code, created_at if invited_by_code else None, ), ) created = cursor.rowcount > 0 if created: logger.info( "user created-if-missing username=%s role=%s auth_provider=%s jellyseerr_user_id=%s profile_id=%s expires_at=%s", username, role, auth_provider, jellyseerr_user_id, profile_id, expires_at, ) else: logger.debug("user create-if-missing skipped existing username=%s", username) return created def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, last_login_at, is_blocked, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, jellyfin_password_hash, last_jellyfin_auth_at FROM users WHERE username = ? COLLATE NOCASE """, (username,), ).fetchone() if not row: return None return { "id": row[0], "username": row[1], "password_hash": row[2], "role": row[3], "auth_provider": row[4], "jellyseerr_user_id": row[5], "created_at": row[6], "last_login_at": row[7], "is_blocked": bool(row[8]), "auto_search_enabled": bool(row[9]), "invite_management_enabled": bool(row[10]), "profile_id": row[11], "expires_at": row[12], "invited_by_code": row[13], "invited_at": row[14], "is_expired": _is_datetime_in_past(row[12]), "jellyfin_password_hash": row[15], "last_jellyfin_auth_at": row[16], } def get_user_by_jellyseerr_id(jellyseerr_user_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, last_login_at, is_blocked, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, jellyfin_password_hash, last_jellyfin_auth_at FROM users WHERE jellyseerr_user_id = ? ORDER BY id ASC LIMIT 1 """, (jellyseerr_user_id,), ).fetchone() if not row: return None return { "id": row[0], "username": row[1], "password_hash": row[2], "role": row[3], "auth_provider": row[4], "jellyseerr_user_id": row[5], "created_at": row[6], "last_login_at": row[7], "is_blocked": bool(row[8]), "auto_search_enabled": bool(row[9]), "invite_management_enabled": bool(row[10]), "profile_id": row[11], "expires_at": row[12], "invited_by_code": row[13], "invited_at": row[14], "is_expired": _is_datetime_in_past(row[12]), "jellyfin_password_hash": row[15], "last_jellyfin_auth_at": row[16], } def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, last_login_at, is_blocked, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, jellyfin_password_hash, last_jellyfin_auth_at FROM users WHERE id = ? """, (user_id,), ).fetchone() if not row: return None return { "id": row[0], "username": row[1], "password_hash": row[2], "role": row[3], "auth_provider": row[4], "jellyseerr_user_id": row[5], "created_at": row[6], "last_login_at": row[7], "is_blocked": bool(row[8]), "auto_search_enabled": bool(row[9]), "invite_management_enabled": bool(row[10]), "profile_id": row[11], "expires_at": row[12], "invited_by_code": row[13], "invited_at": row[14], "is_expired": _is_datetime_in_past(row[12]), "jellyfin_password_hash": row[15], "last_jellyfin_auth_at": row[16], } def get_all_users() -> list[Dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT id, username, role, auth_provider, jellyseerr_user_id, created_at, last_login_at, is_blocked, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at FROM users ORDER BY username COLLATE NOCASE """ ).fetchall() all_rows: list[Dict[str, Any]] = [] for row in rows: all_rows.append( { "id": row[0], "username": row[1], "role": row[2], "auth_provider": row[3], "jellyseerr_user_id": row[4], "created_at": row[5], "last_login_at": row[6], "is_blocked": bool(row[7]), "auto_search_enabled": bool(row[8]), "invite_management_enabled": bool(row[9]), "profile_id": row[10], "expires_at": row[11], "invited_by_code": row[12], "invited_at": row[13], "is_expired": _is_datetime_in_past(row[11]), } ) # Admin user management uses Jellyfin as the source of truth for non-admin # user objects. Seerr rows are treated as enrichment-only and hidden # from admin/user-management views to avoid duplicate accounts in the UI. def _provider_rank(user: Dict[str, Any]) -> int: provider = str(user.get("auth_provider") or "local").strip().lower() if provider == "jellyfin": return 0 if provider == "local": return 1 if provider == "jellyseerr": return 2 return 2 visible_candidates = [ user for user in all_rows if not ( str(user.get("auth_provider") or "local").strip().lower() == "jellyseerr" and str(user.get("role") or "user").strip().lower() != "admin" ) ] visible_candidates.sort( key=lambda user: ( 0 if str(user.get("role") or "user").strip().lower() == "admin" else 1, 0 if isinstance(user.get("jellyseerr_user_id"), int) else 1, _provider_rank(user), 0 if user.get("last_login_at") else 1, int(user.get("id") or 0), ) ) seen_usernames: set[str] = set() seen_jellyseerr_ids: set[int] = set() results: list[Dict[str, Any]] = [] for user in visible_candidates: username = str(user.get("username") or "").strip() if not username: continue username_key = username.lower() jellyseerr_user_id = user.get("jellyseerr_user_id") if isinstance(jellyseerr_user_id, int) and jellyseerr_user_id in seen_jellyseerr_ids: continue if username_key in seen_usernames: continue results.append(user) seen_usernames.add(username_key) if isinstance(jellyseerr_user_id, int): seen_jellyseerr_ids.add(jellyseerr_user_id) results.sort(key=lambda user: str(user.get("username") or "").lower()) return results def delete_non_admin_users() -> int: with _connect() as conn: cursor = conn.execute( """ DELETE FROM users WHERE role != 'admin' """ ) return cursor.rowcount def set_user_jellyseerr_id(username: str, jellyseerr_user_id: Optional[int]) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET jellyseerr_user_id = ? WHERE username = ? """, (jellyseerr_user_id, username), ) def set_user_auth_provider(username: str, auth_provider: str) -> None: provider = (auth_provider or "local").strip().lower() or "local" with _connect() as conn: conn.execute( """ UPDATE users SET auth_provider = ? WHERE username = ? """, (provider, username), ) def set_last_login(username: str) -> None: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ UPDATE users SET last_login_at = ? WHERE username = ? """, (timestamp, username), ) def set_user_blocked(username: str, blocked: bool) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET is_blocked = ? WHERE username = ? """, (1 if blocked else 0, username), ) logger.info("user blocked state updated username=%s blocked=%s", username, blocked) def delete_user_by_username(username: str) -> bool: with _connect() as conn: cursor = conn.execute( """ DELETE FROM users WHERE username = ? COLLATE NOCASE """, (username,), ) deleted = cursor.rowcount > 0 logger.warning("user delete username=%s deleted=%s", username, deleted) return deleted def delete_user_activity_by_username(username: str) -> int: with _connect() as conn: cursor = conn.execute( """ DELETE FROM user_activity WHERE username = ? COLLATE NOCASE """, (username,), ) return cursor.rowcount def disable_signup_invites_by_creator(username: str) -> int: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: cursor = conn.execute( """ UPDATE signup_invites SET enabled = 0, updated_at = ? WHERE created_by = ? COLLATE NOCASE AND enabled != 0 """, (timestamp, username), ) return cursor.rowcount def set_user_role(username: str, role: str) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET role = ? WHERE username = ? """, (role, username), ) logger.info("user role updated username=%s role=%s", username, role) def set_user_auto_search_enabled(username: str, enabled: bool) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET auto_search_enabled = ? WHERE username = ? """, (1 if enabled else 0, username), ) logger.info("user auto-search updated username=%s enabled=%s", username, enabled) def set_user_invite_management_enabled(username: str, enabled: bool) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET invite_management_enabled = ? WHERE username = ? COLLATE NOCASE """, (1 if enabled else 0, username), ) logger.info("user invite-management updated username=%s enabled=%s", username, enabled) def set_auto_search_enabled_for_non_admin_users(enabled: bool) -> int: with _connect() as conn: cursor = conn.execute( """ UPDATE users SET auto_search_enabled = ? WHERE role != 'admin' """, (1 if enabled else 0,), ) return cursor.rowcount def set_invite_management_enabled_for_non_admin_users(enabled: bool) -> int: with _connect() as conn: cursor = conn.execute( """ UPDATE users SET invite_management_enabled = ? WHERE role != 'admin' """, (1 if enabled else 0,), ) logger.info( "bulk invite-management updated non_admin_users=%s enabled=%s", cursor.rowcount, enabled, ) return cursor.rowcount def set_user_profile_id(username: str, profile_id: Optional[int]) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET profile_id = ? WHERE username = ? COLLATE NOCASE """, (profile_id, username), ) logger.info("user profile assignment updated username=%s profile_id=%s", username, profile_id) def set_user_expires_at(username: str, expires_at: Optional[str]) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET expires_at = ? WHERE username = ? COLLATE NOCASE """, (expires_at, username), ) logger.info("user expiry updated username=%s expires_at=%s", username, expires_at) def _row_to_user_profile(row: Any) -> Dict[str, Any]: return { "id": row[0], "name": row[1], "description": row[2], "role": row[3], "auto_search_enabled": bool(row[4]), "account_expires_days": row[5], "is_active": bool(row[6]), "created_at": row[7], "updated_at": row[8], } def list_user_profiles() -> list[Dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT id, name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at FROM user_profiles ORDER BY name COLLATE NOCASE """ ).fetchall() return [_row_to_user_profile(row) for row in rows] def get_user_profile(profile_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT id, name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at FROM user_profiles WHERE id = ? """, (profile_id,), ).fetchone() if not row: return None return _row_to_user_profile(row) def create_user_profile( name: str, description: Optional[str] = None, role: str = "user", auto_search_enabled: bool = True, account_expires_days: Optional[int] = None, is_active: bool = True, ) -> Dict[str, Any]: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: cursor = conn.execute( """ INSERT INTO user_profiles ( name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( name, description, role, 1 if auto_search_enabled else 0, account_expires_days, 1 if is_active else 0, timestamp, timestamp, ), ) profile_id = int(cursor.lastrowid) profile = get_user_profile(profile_id) if not profile: raise RuntimeError("Profile creation failed") return profile def update_user_profile( profile_id: int, *, name: str, description: Optional[str], role: str, auto_search_enabled: bool, account_expires_days: Optional[int], is_active: bool, ) -> Optional[Dict[str, Any]]: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: cursor = conn.execute( """ UPDATE user_profiles SET name = ?, description = ?, role = ?, auto_search_enabled = ?, account_expires_days = ?, is_active = ?, updated_at = ? WHERE id = ? """, ( name, description, role, 1 if auto_search_enabled else 0, account_expires_days, 1 if is_active else 0, timestamp, profile_id, ), ) if cursor.rowcount <= 0: return None return get_user_profile(profile_id) def delete_user_profile(profile_id: int) -> bool: with _connect() as conn: users_count = conn.execute( "SELECT COUNT(*) FROM users WHERE profile_id = ?", (profile_id,), ).fetchone() invites_count = conn.execute( "SELECT COUNT(*) FROM signup_invites WHERE profile_id = ?", (profile_id,), ).fetchone() if int((users_count or [0])[0] or 0) > 0: raise ValueError("Profile is assigned to existing users.") if int((invites_count or [0])[0] or 0) > 0: raise ValueError("Profile is assigned to existing invites.") cursor = conn.execute( "DELETE FROM user_profiles WHERE id = ?", (profile_id,), ) return cursor.rowcount > 0 def _row_to_signup_invite(row: Any) -> Dict[str, Any]: max_uses = row[6] use_count = int(row[7] or 0) expires_at = row[9] is_expired = _is_datetime_in_past(expires_at) remaining_uses = None if max_uses is None else max(int(max_uses) - use_count, 0) return { "id": row[0], "code": row[1], "label": row[2], "description": row[3], "profile_id": row[4], "role": row[5], "max_uses": max_uses, "use_count": use_count, "enabled": bool(row[8]), "expires_at": expires_at, "recipient_email": row[10], "created_by": row[11], "created_at": row[12], "updated_at": row[13], "is_expired": is_expired, "remaining_uses": remaining_uses, "is_usable": bool(row[8]) and not is_expired and (remaining_uses is None or remaining_uses > 0), } def list_signup_invites() -> list[Dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled, expires_at, recipient_email, created_by, created_at, updated_at FROM signup_invites ORDER BY created_at DESC, id DESC """ ).fetchall() return [_row_to_signup_invite(row) for row in rows] def get_signup_invite_by_id(invite_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled, expires_at, recipient_email, created_by, created_at, updated_at FROM signup_invites WHERE id = ? """, (invite_id,), ).fetchone() if not row: return None return _row_to_signup_invite(row) def get_signup_invite_by_code(code: str) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled, expires_at, recipient_email, created_by, created_at, updated_at FROM signup_invites WHERE code = ? COLLATE NOCASE """, (code,), ).fetchone() if not row: return None return _row_to_signup_invite(row) def create_signup_invite( *, code: str, label: Optional[str] = None, description: Optional[str] = None, profile_id: Optional[int] = None, role: Optional[str] = None, max_uses: Optional[int] = None, enabled: bool = True, expires_at: Optional[str] = None, recipient_email: Optional[str] = None, created_by: Optional[str] = None, ) -> Dict[str, Any]: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: cursor = conn.execute( """ INSERT INTO signup_invites ( code, label, description, profile_id, role, max_uses, use_count, enabled, expires_at, recipient_email, created_by, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?) """, ( code, label, description, profile_id, role, max_uses, 1 if enabled else 0, expires_at, recipient_email, created_by, timestamp, timestamp, ), ) invite_id = int(cursor.lastrowid) logger.info( "signup invite created invite_id=%s code=%s role=%s profile_id=%s max_uses=%s enabled=%s expires_at=%s recipient_email=%s created_by=%s", invite_id, code, role, profile_id, max_uses, enabled, expires_at, recipient_email, created_by, ) invite = get_signup_invite_by_id(invite_id) if not invite: raise RuntimeError("Invite creation failed") return invite def update_signup_invite( invite_id: int, *, code: str, label: Optional[str], description: Optional[str], profile_id: Optional[int], role: Optional[str], max_uses: Optional[int], enabled: bool, expires_at: Optional[str], recipient_email: Optional[str], ) -> Optional[Dict[str, Any]]: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: cursor = conn.execute( """ UPDATE signup_invites SET code = ?, label = ?, description = ?, profile_id = ?, role = ?, max_uses = ?, enabled = ?, expires_at = ?, recipient_email = ?, updated_at = ? WHERE id = ? """, ( code, label, description, profile_id, role, max_uses, 1 if enabled else 0, expires_at, recipient_email, timestamp, invite_id, ), ) if cursor.rowcount <= 0: return None return get_signup_invite_by_id(invite_id) def delete_signup_invite(invite_id: int) -> bool: with _connect() as conn: cursor = conn.execute( "DELETE FROM signup_invites WHERE id = ?", (invite_id,), ) return cursor.rowcount > 0 def increment_signup_invite_use(invite_id: int) -> None: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ UPDATE signup_invites SET use_count = use_count + 1, updated_at = ? WHERE id = ? """, (timestamp, invite_id), ) def verify_user_password(username: str, password: str) -> Optional[Dict[str, Any]]: # Resolve case-insensitive duplicates safely by only considering local-provider rows. with _connect() as conn: rows = conn.execute( """ SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, last_login_at, is_blocked, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, jellyfin_password_hash, last_jellyfin_auth_at FROM users WHERE username = ? COLLATE NOCASE ORDER BY CASE WHEN username = ? THEN 0 ELSE 1 END, id ASC """, (username, username), ).fetchall() if not rows: return None for row in rows: provider = str(row[4] or "local").lower() if provider != "local": continue if not verify_password(password, row[2]): continue return { "id": row[0], "username": row[1], "password_hash": row[2], "role": row[3], "auth_provider": row[4], "jellyseerr_user_id": row[5], "created_at": row[6], "last_login_at": row[7], "is_blocked": bool(row[8]), "auto_search_enabled": bool(row[9]), "invite_management_enabled": bool(row[10]), "profile_id": row[11], "expires_at": row[12], "invited_by_code": row[13], "invited_at": row[14], "is_expired": _is_datetime_in_past(row[12]), "jellyfin_password_hash": row[15], "last_jellyfin_auth_at": row[16], } return None def get_users_by_username_ci(username: str) -> list[Dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, created_at, last_login_at, is_blocked, auto_search_enabled, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, jellyfin_password_hash, last_jellyfin_auth_at FROM users WHERE username = ? COLLATE NOCASE ORDER BY CASE WHEN username = ? THEN 0 ELSE 1 END, id ASC """, (username, username), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: results.append( { "id": row[0], "username": row[1], "password_hash": row[2], "role": row[3], "auth_provider": row[4], "jellyseerr_user_id": row[5], "created_at": row[6], "last_login_at": row[7], "is_blocked": bool(row[8]), "auto_search_enabled": bool(row[9]), "invite_management_enabled": bool(row[10]), "profile_id": row[11], "expires_at": row[12], "invited_by_code": row[13], "invited_at": row[14], "is_expired": _is_datetime_in_past(row[12]), "jellyfin_password_hash": row[15], "last_jellyfin_auth_at": row[16], } ) return results def set_user_password(username: str, password: str) -> None: password_hash = hash_password(password) with _connect() as conn: conn.execute( """ UPDATE users SET password_hash = ? WHERE username = ? """, (password_hash, username), ) def sync_jellyfin_password_state(username: str, password: str) -> None: if not username or not password: return password_hash = hash_password(password) timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ UPDATE users SET password_hash = ?, jellyfin_password_hash = ?, last_jellyfin_auth_at = ? WHERE username = ? COLLATE NOCASE """, (password_hash, password_hash, timestamp, username), ) def set_jellyfin_auth_cache(username: str, password: str) -> None: if not username or not password: return password_hash = hash_password(password) timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ UPDATE users SET jellyfin_password_hash = ?, last_jellyfin_auth_at = ? WHERE username = ? COLLATE NOCASE """, (password_hash, timestamp, username), ) def _backfill_auth_providers() -> None: with _connect() as conn: rows = conn.execute( """ SELECT username, password_hash, auth_provider FROM users """ ).fetchall() updates: list[tuple[str, str]] = [] for row in rows: username, password_hash, auth_provider = row provider = auth_provider or "local" if provider == "local": if verify_password("jellyfin-user", password_hash): provider = "jellyfin" elif verify_password("jellyseerr-user", password_hash): provider = "jellyseerr" if provider != auth_provider: updates.append((provider, username)) if not updates: return with _connect() as conn: conn.executemany( """ UPDATE users SET auth_provider = ? WHERE username = ? """, updates, ) def upsert_user_activity(username: str, ip: str, user_agent: str) -> None: if not username: return ip_value = ip.strip() if isinstance(ip, str) and ip.strip() else "unknown" agent_value = ( user_agent.strip() if isinstance(user_agent, str) and user_agent.strip() else "unknown" ) timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO user_activity (username, ip, user_agent, first_seen_at, last_seen_at, hit_count) VALUES (?, ?, ?, ?, ?, 1) ON CONFLICT(username, ip, user_agent) DO UPDATE SET last_seen_at = excluded.last_seen_at, hit_count = hit_count + 1 """, (username, ip_value, agent_value, timestamp, timestamp), ) def get_user_activity(username: str, limit: int = 5) -> list[Dict[str, Any]]: limit = max(1, min(limit, 20)) with _connect() as conn: rows = conn.execute( """ SELECT ip, user_agent, first_seen_at, last_seen_at, hit_count FROM user_activity WHERE username = ? ORDER BY last_seen_at DESC LIMIT ? """, (username, limit), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: results.append( { "ip": row[0], "user_agent": row[1], "first_seen_at": row[2], "last_seen_at": row[3], "hit_count": row[4], } ) return results def get_user_activity_summary(username: str) -> Dict[str, Any]: with _connect() as conn: last_row = conn.execute( """ SELECT ip, user_agent, last_seen_at FROM user_activity WHERE username = ? ORDER BY last_seen_at DESC LIMIT 1 """, (username,), ).fetchone() count_row = conn.execute( """ SELECT COUNT(*) FROM user_activity WHERE username = ? """, (username,), ).fetchone() return { "last_ip": last_row[0] if last_row else None, "last_user_agent": last_row[1] if last_row else None, "last_seen_at": last_row[2] if last_row else None, "device_count": int(count_row[0] or 0) if count_row else 0, } def get_user_request_stats(username_norm: str, requested_by_id: Optional[int] = None) -> Dict[str, Any]: if requested_by_id is None: return { "total": 0, "ready": 0, "pending": 0, "approved": 0, "working": 0, "partial": 0, "declined": 0, "in_progress": 0, "last_request_at": None, } with _connect() as conn: row = conn.execute( """ SELECT COUNT(*) AS total, SUM(CASE WHEN status = 4 THEN 1 ELSE 0 END) AS ready, SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS pending, SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) AS approved, SUM(CASE WHEN status = 5 THEN 1 ELSE 0 END) AS working, SUM(CASE WHEN status = 6 THEN 1 ELSE 0 END) AS partial, SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) AS declined, MAX(created_at) AS last_request_at FROM requests_cache WHERE requested_by_id = ? """, (requested_by_id,), ).fetchone() if not row: return { "total": 0, "ready": 0, "pending": 0, "approved": 0, "working": 0, "partial": 0, "declined": 0, "in_progress": 0, "last_request_at": None, } total = int(row[0] or 0) ready = int(row[1] or 0) pending = int(row[2] or 0) approved = int(row[3] or 0) working = int(row[4] or 0) partial = int(row[5] or 0) declined = int(row[6] or 0) in_progress = approved + working + partial return { "total": total, "ready": ready, "pending": pending, "approved": approved, "working": working, "partial": partial, "declined": declined, "in_progress": in_progress, "last_request_at": row[7], } def get_global_request_leader() -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT requested_by_norm, MAX(requested_by) as display_name, COUNT(*) as total FROM requests_cache WHERE requested_by_norm IS NOT NULL AND requested_by_norm != '' GROUP BY requested_by_norm ORDER BY total DESC LIMIT 1 """ ).fetchone() if not row: return None return {"username": row[1] or row[0], "total": int(row[2] or 0)} def get_global_request_total() -> int: with _connect() as conn: row = conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone() return int(row[0] or 0) _REQUESTS_CACHE_UPSERT_SQL = """ INSERT INTO requests_cache ( request_id, media_id, media_type, status, title, year, requested_by, requested_by_norm, requested_by_id, created_at, updated_at, payload_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(request_id) DO UPDATE SET media_id = excluded.media_id, media_type = excluded.media_type, status = excluded.status, title = excluded.title, year = excluded.year, requested_by = excluded.requested_by, requested_by_norm = excluded.requested_by_norm, requested_by_id = excluded.requested_by_id, created_at = excluded.created_at, updated_at = excluded.updated_at, payload_json = excluded.payload_json """ def get_request_cache_lookup(request_ids: list[int]) -> Dict[int, Dict[str, Any]]: normalized_ids = sorted({int(request_id) for request_id in request_ids if isinstance(request_id, int)}) if not normalized_ids: return {} placeholders = ", ".join("?" for _ in normalized_ids) query = f""" SELECT request_id, updated_at, title, year FROM requests_cache WHERE request_id IN ({placeholders}) """ with _connect() as conn: rows = conn.execute(query, tuple(normalized_ids)).fetchall() return { int(row[0]): { "request_id": int(row[0]), "updated_at": row[1], "title": row[2], "year": row[3], } for row in rows } def _prepare_requests_cache_upsert_rows( records: list[Dict[str, Any]], conn: sqlite3.Connection ) -> list[tuple[Any, ...]]: if not records: return [] existing_rows: Dict[int, tuple[Optional[str], Optional[int]]] = {} ids_needing_existing = [ int(record["request_id"]) for record in records if isinstance(record.get("request_id"), int) and ( not _normalize_title_value(record.get("title")) or _normalize_year_value(record.get("year")) is None ) ] if ids_needing_existing: placeholders = ", ".join("?" for _ in sorted(set(ids_needing_existing))) query = f""" SELECT request_id, title, year FROM requests_cache WHERE request_id IN ({placeholders}) """ for row in conn.execute(query, tuple(sorted(set(ids_needing_existing)))).fetchall(): existing_rows[int(row[0])] = (row[1], row[2]) prepared: list[tuple[Any, ...]] = [] for record in records: request_id = int(record["request_id"]) media_id = record.get("media_id") media_type = record.get("media_type") status = record.get("status") requested_by = record.get("requested_by") requested_by_norm = record.get("requested_by_norm") requested_by_id = record.get("requested_by_id") created_at = record.get("created_at") updated_at = record.get("updated_at") payload_json = str(record.get("payload_json") or "") normalized_title = _normalize_title_value(record.get("title")) normalized_year = _normalize_year_value(record.get("year")) derived_title = None derived_year = None if not normalized_title or normalized_year is None: derived_title, derived_year = _extract_title_year_from_payload(payload_json) if _is_placeholder_title(normalized_title, request_id): normalized_title = None if derived_title and not normalized_title: normalized_title = derived_title if normalized_year is None and derived_year is not None: normalized_year = derived_year existing_title = None existing_year = None if normalized_title is None or normalized_year is None: existing = existing_rows.get(request_id) if existing: existing_title, existing_year = existing if _is_placeholder_title(existing_title, request_id): existing_title = None if normalized_title is None and existing_title: normalized_title = existing_title if normalized_year is None and existing_year is not None: normalized_year = existing_year prepared.append( ( request_id, media_id, media_type, status, normalized_title, normalized_year, requested_by, requested_by_norm, requested_by_id, created_at, updated_at, payload_json, ) ) return prepared def upsert_request_cache( request_id: int, media_id: Optional[int], media_type: Optional[str], status: Optional[int], title: Optional[str], year: Optional[int], requested_by: Optional[str], requested_by_norm: Optional[str], requested_by_id: Optional[int], created_at: Optional[str], updated_at: Optional[str], payload_json: str, ) -> None: with _connect() as conn: rows = _prepare_requests_cache_upsert_rows( [ { "request_id": request_id, "media_id": media_id, "media_type": media_type, "status": status, "title": title, "year": year, "requested_by": requested_by, "requested_by_norm": requested_by_norm, "requested_by_id": requested_by_id, "created_at": created_at, "updated_at": updated_at, "payload_json": payload_json, } ], conn, ) if rows: conn.execute(_REQUESTS_CACHE_UPSERT_SQL, rows[0]) logger.debug( "requests_cache upsert: request_id=%s media_id=%s status=%s updated_at=%s", request_id, media_id, status, updated_at, ) def upsert_request_cache_many(records: list[Dict[str, Any]]) -> int: if not records: return 0 with _connect() as conn: rows = _prepare_requests_cache_upsert_rows(records, conn) if rows: conn.executemany(_REQUESTS_CACHE_UPSERT_SQL, rows) logger.debug("requests_cache bulk upsert: rows=%s", len(records)) return len(records) def get_request_cache_last_updated() -> Optional[str]: with _connect() as conn: row = conn.execute( """ SELECT MAX(updated_at) FROM requests_cache """ ).fetchone() if not row: return None return row[0] def get_request_cache_by_id(request_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT request_id, updated_at, title FROM requests_cache WHERE request_id = ? """, (request_id,), ).fetchone() if not row: logger.debug("requests_cache miss: request_id=%s", request_id) return None logger.debug("requests_cache hit: request_id=%s updated_at=%s", row[0], row[1]) return {"request_id": row[0], "updated_at": row[1], "title": row[2]} def get_request_cache_payload(request_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT payload_json FROM requests_cache WHERE request_id = ? """, (request_id,), ).fetchone() if not row or not row[0]: logger.debug("requests_cache payload miss: request_id=%s", request_id) return None try: payload = json.loads(row[0]) logger.debug("requests_cache payload hit: request_id=%s", request_id) return payload except json.JSONDecodeError: logger.warning("requests_cache payload invalid json: request_id=%s", request_id) return None def get_cached_requests( limit: int, offset: int, requested_by_norm: Optional[str] = None, requested_by_id: Optional[int] = None, since_iso: Optional[str] = None, status_codes: Optional[list[int]] = None, ) -> list[Dict[str, Any]]: query = """ SELECT request_id, media_id, media_type, status, title, year, requested_by, requested_by_norm, requested_by_id, created_at, payload_json FROM requests_cache """ params: list[Any] = [] conditions = [] if requested_by_id is not None: conditions.append("requested_by_id = ?") params.append(requested_by_id) elif requested_by_norm: conditions.append("requested_by_norm = ?") params.append(requested_by_norm) if since_iso: conditions.append("created_at >= ?") params.append(since_iso) if status_codes: placeholders = ", ".join("?" for _ in status_codes) conditions.append(f"status IN ({placeholders})") params.extend(status_codes) if conditions: query += " WHERE " + " AND ".join(conditions) query += " ORDER BY created_at DESC, request_id DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) with _connect() as conn: rows = conn.execute(query, tuple(params)).fetchall() logger.debug( "requests_cache list: count=%s requested_by_norm=%s requested_by_id=%s since_iso=%s status_codes=%s", len(rows), requested_by_norm, requested_by_id, since_iso, status_codes, ) results: list[Dict[str, Any]] = [] for row in rows: title = row[4] year = row[5] payload_json = row[10] if (not title or not year) and payload_json: derived_title, derived_year = _extract_title_year_from_payload(payload_json) if not title: title = derived_title if not year: year = derived_year results.append( { "request_id": row[0], "media_id": row[1], "media_type": row[2], "status": row[3], "title": title, "year": year, "requested_by": row[6], "requested_by_norm": row[7], "requested_by_id": row[8], "created_at": row[9], } ) return results def get_cached_requests_count( requested_by_norm: Optional[str] = None, requested_by_id: Optional[int] = None, since_iso: Optional[str] = None, status_codes: Optional[list[int]] = None, ) -> int: query = "SELECT COUNT(*) FROM requests_cache" params: list[Any] = [] conditions = [] if requested_by_id is not None: conditions.append("requested_by_id = ?") params.append(requested_by_id) elif requested_by_norm: conditions.append("requested_by_norm = ?") params.append(requested_by_norm) if since_iso: conditions.append("created_at >= ?") params.append(since_iso) if status_codes: placeholders = ", ".join("?" for _ in status_codes) conditions.append(f"status IN ({placeholders})") params.extend(status_codes) if conditions: query += " WHERE " + " AND ".join(conditions) with _connect() as conn: row = conn.execute(query, tuple(params)).fetchone() if not row: return 0 return int(row[0]) def get_request_cache_overview(limit: int = 50) -> list[Dict[str, Any]]: limit = max(1, min(limit, 200)) with _connect() as conn: rows = conn.execute( """ SELECT request_id, media_id, media_type, status, title, year, requested_by, requested_by_norm, requested_by_id, created_at, updated_at, payload_json FROM requests_cache ORDER BY updated_at DESC, request_id DESC LIMIT ? """, (limit,), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: title = row[4] if not title and row[11]: derived_title, _ = _extract_title_year_from_payload(row[11]) title = derived_title or row[4] results.append( { "request_id": row[0], "media_id": row[1], "media_type": row[2], "status": row[3], "title": title, "year": row[5], "requested_by": row[6], "requested_by_norm": row[7], "requested_by_id": row[8], "created_at": row[9], "updated_at": row[10], } ) return results def get_request_cache_missing_titles(limit: int = 200) -> list[Dict[str, Any]]: limit = max(1, min(limit, 500)) with _connect() as conn: rows = conn.execute( """ SELECT request_id, payload_json FROM requests_cache WHERE title IS NULL OR TRIM(title) = '' OR LOWER(title) = 'untitled' ORDER BY updated_at DESC, request_id DESC LIMIT ? """, (limit,), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: payload_json = row[1] tmdb_id, media_type = _extract_tmdb_from_payload(payload_json) results.append( { "request_id": row[0], "payload_json": payload_json, "tmdb_id": tmdb_id, "media_type": media_type, } ) return results def get_request_cache_count() -> int: with _connect() as conn: row = conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone() return int(row[0] or 0) def upsert_artwork_cache_status( request_id: int, tmdb_id: Optional[int], media_type: Optional[str], poster_path: Optional[str], backdrop_path: Optional[str], has_tmdb: bool, poster_cached: bool, backdrop_cached: bool, ) -> None: upsert_artwork_cache_status_many( [ { "request_id": request_id, "tmdb_id": tmdb_id, "media_type": media_type, "poster_path": poster_path, "backdrop_path": backdrop_path, "has_tmdb": has_tmdb, "poster_cached": poster_cached, "backdrop_cached": backdrop_cached, } ] ) def upsert_artwork_cache_status_many(records: list[Dict[str, Any]]) -> int: if not records: return 0 updated_at = datetime.now(timezone.utc).isoformat() params = [ ( record["request_id"], record.get("tmdb_id"), record.get("media_type"), record.get("poster_path"), record.get("backdrop_path"), 1 if record.get("has_tmdb") else 0, 1 if record.get("poster_cached") else 0, 1 if record.get("backdrop_cached") else 0, updated_at, ) for record in records if isinstance(record.get("request_id"), int) ] if not params: return 0 with _connect() as conn: conn.executemany( """ INSERT INTO artwork_cache_status ( request_id, tmdb_id, media_type, poster_path, backdrop_path, has_tmdb, poster_cached, backdrop_cached, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(request_id) DO UPDATE SET tmdb_id = excluded.tmdb_id, media_type = excluded.media_type, poster_path = excluded.poster_path, backdrop_path = excluded.backdrop_path, has_tmdb = excluded.has_tmdb, poster_cached = excluded.poster_cached, backdrop_cached = excluded.backdrop_cached, updated_at = excluded.updated_at """, params, ) return len(params) def get_artwork_cache_status_count() -> int: with _connect() as conn: row = conn.execute("SELECT COUNT(*) FROM artwork_cache_status").fetchone() return int(row[0] or 0) def get_artwork_cache_missing_count() -> int: with _connect() as conn: row = conn.execute( """ SELECT COUNT(*) FROM artwork_cache_status WHERE ( (poster_path IS NULL AND has_tmdb = 1) OR (poster_path IS NOT NULL AND poster_cached = 0) OR (backdrop_path IS NULL AND has_tmdb = 1) OR (backdrop_path IS NOT NULL AND backdrop_cached = 0) ) """ ).fetchone() return int(row[0] or 0) def update_artwork_cache_stats( cache_bytes: Optional[int] = None, cache_files: Optional[int] = None, missing_count: Optional[int] = None, total_requests: Optional[int] = None, ) -> None: updated_at = datetime.now(timezone.utc).isoformat() if cache_bytes is not None: set_setting("artwork_cache_bytes", str(int(cache_bytes))) if cache_files is not None: set_setting("artwork_cache_files", str(int(cache_files))) if missing_count is not None: set_setting("artwork_cache_missing", str(int(missing_count))) if total_requests is not None: set_setting("artwork_cache_total_requests", str(int(total_requests))) set_setting("artwork_cache_updated_at", updated_at) def get_artwork_cache_stats() -> Dict[str, Any]: def _get_int(key: str) -> int: value = get_setting(key) if value is None: return 0 try: return int(value) except (TypeError, ValueError): return 0 return { "cache_bytes": _get_int("artwork_cache_bytes"), "cache_files": _get_int("artwork_cache_files"), "missing_artwork": _get_int("artwork_cache_missing"), "total_requests": _get_int("artwork_cache_total_requests"), "updated_at": get_setting("artwork_cache_updated_at"), } def get_request_cache_stats() -> Dict[str, Any]: return get_artwork_cache_stats() def update_request_cache_title( request_id: int, title: str, year: Optional[int] = None ) -> None: normalized_title = _normalize_title_value(title) normalized_year = _normalize_year_value(year) if not normalized_title: return with _connect() as conn: conn.execute( """ UPDATE requests_cache SET title = ?, year = COALESCE(?, year) WHERE request_id = ? """, (normalized_title, normalized_year, request_id), ) def repair_request_cache_titles() -> int: updated = 0 with _connect() as conn: rows = conn.execute( """ SELECT request_id, title, year, payload_json FROM requests_cache """ ).fetchall() for row in rows: request_id, title, year, payload_json = row if not _is_placeholder_title(title, request_id): continue derived_title, derived_year = _extract_title_year_from_payload(payload_json) if not derived_title: continue conn.execute( """ UPDATE requests_cache SET title = ?, year = COALESCE(?, year) WHERE request_id = ? """, (derived_title, derived_year, request_id), ) updated += 1 return updated def prune_duplicate_requests_cache() -> int: with _connect() as conn: cursor = conn.execute( """ DELETE FROM requests_cache WHERE media_id IS NOT NULL AND request_id NOT IN ( SELECT MAX(request_id) FROM requests_cache WHERE media_id IS NOT NULL GROUP BY media_id, COALESCE(requested_by_norm, '') ) """ ) return cursor.rowcount def get_request_cache_payloads(limit: int = 200, offset: int = 0) -> list[Dict[str, Any]]: limit = max(1, min(limit, 1000)) offset = max(0, offset) with _connect() as conn: rows = conn.execute( """ SELECT request_id, payload_json FROM requests_cache ORDER BY request_id ASC LIMIT ? OFFSET ? """, (limit, offset), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: payload = None if row[1]: try: payload = json.loads(row[1]) except json.JSONDecodeError: payload = None results.append({"request_id": row[0], "payload": payload}) return results def get_request_cache_payloads_missing(limit: int = 200, offset: int = 0) -> list[Dict[str, Any]]: limit = max(1, min(limit, 1000)) offset = max(0, offset) with _connect() as conn: rows = conn.execute( """ SELECT rc.request_id, rc.payload_json FROM requests_cache rc JOIN artwork_cache_status acs ON rc.request_id = acs.request_id WHERE ( (acs.poster_path IS NULL AND acs.has_tmdb = 1) OR (acs.poster_path IS NOT NULL AND acs.poster_cached = 0) OR (acs.backdrop_path IS NULL AND acs.has_tmdb = 1) OR (acs.backdrop_path IS NOT NULL AND acs.backdrop_cached = 0) ) ORDER BY rc.request_id ASC LIMIT ? OFFSET ? """, (limit, offset), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: payload = None if row[1]: try: payload = json.loads(row[1]) except json.JSONDecodeError: payload = None results.append({"request_id": row[0], "payload": payload}) return results def get_cached_requests_since(since_iso: str) -> list[Dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT request_id, media_id, media_type, status, title, year, requested_by, requested_by_norm, requested_by_id, created_at FROM requests_cache WHERE created_at >= ? ORDER BY created_at DESC, request_id DESC """, (since_iso,), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: results.append( { "request_id": row[0], "media_id": row[1], "media_type": row[2], "status": row[3], "title": row[4], "year": row[5], "requested_by": row[6], "requested_by_norm": row[7], "requested_by_id": row[8], "created_at": row[9], } ) return results def get_cached_request_by_media_id( media_id: int, requested_by_norm: Optional[str] = None, requested_by_id: Optional[int] = None, ) -> Optional[Dict[str, Any]]: query = """ SELECT request_id, status FROM requests_cache WHERE media_id = ? """ params: list[Any] = [media_id] if requested_by_id is not None: query += " AND requested_by_id = ?" params.append(requested_by_id) elif requested_by_norm: query += " AND requested_by_norm = ?" params.append(requested_by_norm) query += " ORDER BY created_at DESC, request_id DESC LIMIT 1" with _connect() as conn: row = conn.execute(query, tuple(params)).fetchone() if not row: return None return {"request_id": row[0], "status": row[1]} def get_setting(key: str) -> Optional[str]: with _connect() as conn: row = conn.execute( """ SELECT value FROM settings WHERE key = ? """, (key,), ).fetchone() if not row: return None return row[0] def set_setting(key: str, value: Optional[str]) -> None: updated_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO settings (key, value, updated_at) VALUES (?, ?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at """, (key, value, updated_at), ) def delete_setting(key: str) -> None: with _connect() as conn: conn.execute( """ DELETE FROM settings WHERE key = ? """, (key,), ) def get_settings_overrides() -> Dict[str, str]: with _connect() as conn: rows = conn.execute( """ SELECT key, value FROM settings """ ).fetchall() overrides: Dict[str, str] = {} for row in rows: key = row[0] value = row[1] if key: overrides[key] = value return overrides def _hash_password_reset_token(token_value: str) -> str: return sha256(str(token_value).encode("utf-8")).hexdigest() def _password_reset_token_row_to_dict(row: Any) -> Dict[str, Any]: return { "id": row[0], "token_hash": row[1], "username": row[2], "recipient_email": row[3], "auth_provider": row[4], "created_at": row[5], "expires_at": row[6], "used_at": row[7], "requested_by_ip": row[8], "requested_user_agent": row[9], "is_expired": _is_datetime_in_past(row[6]), "is_used": bool(row[7]), } def delete_expired_password_reset_tokens() -> int: now_iso = datetime.now(timezone.utc).isoformat() with _connect() as conn: cursor = conn.execute( """ DELETE FROM password_reset_tokens WHERE expires_at <= ? OR used_at IS NOT NULL """, (now_iso,), ) return int(cursor.rowcount or 0) def create_password_reset_token( token_value: str, username: str, recipient_email: str, auth_provider: str, expires_at: str, *, requested_by_ip: Optional[str] = None, requested_user_agent: Optional[str] = None, ) -> Dict[str, Any]: created_at = datetime.now(timezone.utc).isoformat() token_hash = _hash_password_reset_token(token_value) delete_expired_password_reset_tokens() with _connect() as conn: conn.execute( """ DELETE FROM password_reset_tokens WHERE username = ? AND used_at IS NULL """, (username,), ) conn.execute( """ INSERT INTO password_reset_tokens ( token_hash, username, recipient_email, auth_provider, created_at, expires_at, used_at, requested_by_ip, requested_user_agent ) VALUES (?, ?, ?, ?, ?, ?, NULL, ?, ?) """, ( token_hash, username, recipient_email, auth_provider, created_at, expires_at, requested_by_ip, requested_user_agent, ), ) logger.info( "password reset token created username=%s provider=%s recipient=%s expires_at=%s requester_ip=%s", username, auth_provider, recipient_email, expires_at, requested_by_ip, ) return { "username": username, "recipient_email": recipient_email, "auth_provider": auth_provider, "created_at": created_at, "expires_at": expires_at, "requested_by_ip": requested_by_ip, "requested_user_agent": requested_user_agent, } def get_password_reset_token(token_value: str) -> Optional[Dict[str, Any]]: token_hash = _hash_password_reset_token(token_value) with _connect() as conn: row = conn.execute( """ SELECT id, token_hash, username, recipient_email, auth_provider, created_at, expires_at, used_at, requested_by_ip, requested_user_agent FROM password_reset_tokens WHERE token_hash = ? """, (token_hash,), ).fetchone() if not row: return None return _password_reset_token_row_to_dict(row) def mark_password_reset_token_used(token_value: str) -> None: token_hash = _hash_password_reset_token(token_value) used_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ UPDATE password_reset_tokens SET used_at = ? WHERE token_hash = ? AND used_at IS NULL """, (used_at, token_hash), ) logger.info("password reset token marked used token_hash=%s", token_hash[:12]) def get_seerr_media_failure(media_type: Optional[str], tmdb_id: Optional[int]) -> Optional[Dict[str, Any]]: if not media_type or not tmdb_id: return None normalized_media_type = str(media_type).strip().lower() try: normalized_tmdb_id = int(tmdb_id) except (TypeError, ValueError): return None with _connect() as conn: row = conn.execute( """ SELECT media_type, tmdb_id, status_code, error_message, failure_count, first_failed_at, last_failed_at, suppress_until, is_persistent FROM seerr_media_failures WHERE media_type = ? AND tmdb_id = ? """, (normalized_media_type, normalized_tmdb_id), ).fetchone() if not row: return None return { "media_type": row[0], "tmdb_id": row[1], "status_code": row[2], "error_message": row[3], "failure_count": row[4], "first_failed_at": row[5], "last_failed_at": row[6], "suppress_until": row[7], "is_persistent": bool(row[8]), } def is_seerr_media_failure_suppressed(media_type: Optional[str], tmdb_id: Optional[int]) -> bool: record = get_seerr_media_failure(media_type, tmdb_id) if not record: return False suppress_until = _parse_datetime_value(record.get("suppress_until")) if suppress_until and suppress_until > datetime.now(timezone.utc): return True clear_seerr_media_failure(media_type, tmdb_id) return False def record_seerr_media_failure( media_type: Optional[str], tmdb_id: Optional[int], *, status_code: Optional[int] = None, error_message: Optional[str] = None, ) -> Dict[str, Any]: if not media_type or not tmdb_id: return {} normalized_media_type = str(media_type).strip().lower() normalized_tmdb_id = int(tmdb_id) now = datetime.now(timezone.utc) existing = get_seerr_media_failure(normalized_media_type, normalized_tmdb_id) failure_count = int(existing.get("failure_count", 0)) + 1 if existing else 1 is_persistent = failure_count >= SEERR_MEDIA_FAILURE_PERSISTENT_THRESHOLD if is_persistent: suppress_until = now + timedelta(days=SEERR_MEDIA_FAILURE_PERSISTENT_SUPPRESS_DAYS) elif failure_count >= 2: suppress_until = now + timedelta(hours=SEERR_MEDIA_FAILURE_RETRY_SUPPRESS_HOURS) else: suppress_until = now + timedelta(hours=SEERR_MEDIA_FAILURE_SHORT_SUPPRESS_HOURS) payload = { "media_type": normalized_media_type, "tmdb_id": normalized_tmdb_id, "status_code": status_code, "error_message": error_message, "failure_count": failure_count, "first_failed_at": existing.get("first_failed_at") if existing else now.isoformat(), "last_failed_at": now.isoformat(), "suppress_until": suppress_until.isoformat(), "is_persistent": is_persistent, } with _connect() as conn: conn.execute( """ INSERT INTO seerr_media_failures ( media_type, tmdb_id, status_code, error_message, failure_count, first_failed_at, last_failed_at, suppress_until, is_persistent ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(media_type, tmdb_id) DO UPDATE SET status_code = excluded.status_code, error_message = excluded.error_message, failure_count = excluded.failure_count, first_failed_at = excluded.first_failed_at, last_failed_at = excluded.last_failed_at, suppress_until = excluded.suppress_until, is_persistent = excluded.is_persistent """, ( payload["media_type"], payload["tmdb_id"], payload["status_code"], payload["error_message"], payload["failure_count"], payload["first_failed_at"], payload["last_failed_at"], payload["suppress_until"], 1 if payload["is_persistent"] else 0, ), ) logger.warning( "seerr_media_failure upsert: media_type=%s tmdb_id=%s status=%s failure_count=%s suppress_until=%s persistent=%s", payload["media_type"], payload["tmdb_id"], payload["status_code"], payload["failure_count"], payload["suppress_until"], payload["is_persistent"], ) return payload def clear_seerr_media_failure(media_type: Optional[str], tmdb_id: Optional[int]) -> None: if not media_type or not tmdb_id: return normalized_media_type = str(media_type).strip().lower() try: normalized_tmdb_id = int(tmdb_id) except (TypeError, ValueError): return with _connect() as conn: deleted = conn.execute( """ DELETE FROM seerr_media_failures WHERE media_type = ? AND tmdb_id = ? """, (normalized_media_type, normalized_tmdb_id), ).rowcount if deleted: logger.info( "seerr_media_failure cleared: media_type=%s tmdb_id=%s", normalized_media_type, normalized_tmdb_id, ) def run_integrity_check() -> str: with _connect() as conn: row = conn.execute("PRAGMA integrity_check").fetchone() if not row: return "unknown" return str(row[0]) def get_database_diagnostics() -> Dict[str, Any]: db_path = _db_path() wal_path = f"{db_path}-wal" shm_path = f"{db_path}-shm" def _size(path: str) -> int: try: return os.path.getsize(path) except OSError: return 0 started = perf_counter() with _connect() as conn: integrity_started = perf_counter() integrity_row = conn.execute("PRAGMA integrity_check").fetchone() integrity_ms = round((perf_counter() - integrity_started) * 1000, 1) integrity = str(integrity_row[0]) if integrity_row else "unknown" pragma_started = perf_counter() page_size_row = conn.execute("PRAGMA page_size").fetchone() page_count_row = conn.execute("PRAGMA page_count").fetchone() freelist_row = conn.execute("PRAGMA freelist_count").fetchone() pragma_ms = round((perf_counter() - pragma_started) * 1000, 1) row_count_started = perf_counter() table_counts = { "users": int(conn.execute("SELECT COUNT(*) FROM users").fetchone()[0] or 0), "requests_cache": int(conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone()[0] or 0), "artwork_cache_status": int(conn.execute("SELECT COUNT(*) FROM artwork_cache_status").fetchone()[0] or 0), "signup_invites": int(conn.execute("SELECT COUNT(*) FROM signup_invites").fetchone()[0] or 0), "settings": int(conn.execute("SELECT COUNT(*) FROM settings").fetchone()[0] or 0), "actions": int(conn.execute("SELECT COUNT(*) FROM actions").fetchone()[0] or 0), "snapshots": int(conn.execute("SELECT COUNT(*) FROM snapshots").fetchone()[0] or 0), "seerr_media_failures": int(conn.execute("SELECT COUNT(*) FROM seerr_media_failures").fetchone()[0] or 0), "password_reset_tokens": int(conn.execute("SELECT COUNT(*) FROM password_reset_tokens").fetchone()[0] or 0), } row_count_ms = round((perf_counter() - row_count_started) * 1000, 1) page_size = int(page_size_row[0] or 0) if page_size_row else 0 page_count = int(page_count_row[0] or 0) if page_count_row else 0 freelist_pages = int(freelist_row[0] or 0) if freelist_row else 0 db_size_bytes = _size(db_path) wal_size_bytes = _size(wal_path) shm_size_bytes = _size(shm_path) return { "integrity_check": integrity, "database_path": db_path, "database_size_bytes": db_size_bytes, "wal_size_bytes": wal_size_bytes, "shm_size_bytes": shm_size_bytes, "page_size_bytes": page_size, "page_count": page_count, "freelist_pages": freelist_pages, "allocated_bytes": page_size * page_count, "free_bytes": page_size * freelist_pages, "row_counts": table_counts, "timings_ms": { "integrity_check": integrity_ms, "pragmas": pragma_ms, "row_counts": row_count_ms, "total": round((perf_counter() - started) * 1000, 1), }, } def vacuum_db() -> None: with _connect() as conn: conn.execute("VACUUM") def clear_requests_cache() -> int: with _connect() as conn: cursor = conn.execute("DELETE FROM requests_cache") return cursor.rowcount def clear_history() -> Dict[str, int]: with _connect() as conn: actions = conn.execute("DELETE FROM actions").rowcount snapshots = conn.execute("DELETE FROM snapshots").rowcount return {"actions": actions, "snapshots": snapshots} def clear_user_objects_nuclear() -> Dict[str, int]: with _connect() as conn: # Preserve admin accounts, but remove invite/profile references so profile rows can be deleted safely. admin_reset = conn.execute( """ UPDATE users SET profile_id = NULL, invited_by_code = NULL, invited_at = NULL WHERE role = 'admin' """ ).rowcount users = conn.execute("DELETE FROM users WHERE role != 'admin'").rowcount invites = conn.execute("DELETE FROM signup_invites").rowcount profiles = conn.execute("DELETE FROM user_profiles").rowcount return { "users": users, "invites": invites, "profiles": profiles, "adminsReset": admin_reset, } def cleanup_history(days: int) -> Dict[str, int]: if days <= 0: return {"actions": 0, "snapshots": 0} cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() with _connect() as conn: actions = conn.execute( "DELETE FROM actions WHERE created_at < ?", (cutoff,), ).rowcount snapshots = conn.execute( "DELETE FROM snapshots WHERE created_at < ?", (cutoff,), ).rowcount return {"actions": actions, "snapshots": snapshots}