import json import os import sqlite3 import logging from datetime import datetime, timezone, timedelta from typing import Any, Dict, Optional from .config import settings from .models import Snapshot from .security import hash_password, verify_password logger = logging.getLogger(__name__) def _db_path() -> str: path = settings.sqlite_path or "data/magent.db" if not os.path.isabs(path): path = os.path.join(os.getcwd(), path) os.makedirs(os.path.dirname(path), exist_ok=True) return path def _connect() -> sqlite3.Connection: return sqlite3.connect(_db_path()) def _normalize_title_value(title: Optional[str]) -> Optional[str]: if not isinstance(title, str): return None trimmed = title.strip() return trimmed if trimmed else None def _normalize_year_value(year: Optional[Any]) -> Optional[int]: if isinstance(year, int): return year if isinstance(year, str): trimmed = year.strip() if trimmed.isdigit(): return int(trimmed) return None def _is_placeholder_title(title: Optional[str], request_id: Optional[int]) -> bool: if not isinstance(title, str): return True normalized = title.strip().lower() if not normalized: return True if normalized == "untitled": return True if request_id and normalized == f"request {request_id}": return True return False def _extract_title_year_from_payload(payload_json: Optional[str]) -> tuple[Optional[str], Optional[int]]: if not payload_json: return None, None try: payload = json.loads(payload_json) except json.JSONDecodeError: return None, None if not isinstance(payload, dict): return None, None media = payload.get("media") or {} title = None year = None if isinstance(media, dict): title = media.get("title") or media.get("name") year = media.get("year") if not title: title = payload.get("title") or payload.get("name") if year is None: year = payload.get("year") return _normalize_title_value(title), _normalize_year_value(year) def _extract_tmdb_from_payload(payload_json: Optional[str]) -> tuple[Optional[int], Optional[str]]: if not payload_json: return None, None try: payload = json.loads(payload_json) except (TypeError, json.JSONDecodeError): return None, None if not isinstance(payload, dict): return None, None media = payload.get("media") or {} if not isinstance(media, dict): media = {} tmdb_id = ( media.get("tmdbId") or payload.get("tmdbId") or payload.get("tmdb_id") or media.get("externalServiceId") or payload.get("externalServiceId") ) media_type = ( media.get("mediaType") or payload.get("mediaType") or payload.get("media_type") or payload.get("type") ) try: tmdb_id = int(tmdb_id) if tmdb_id is not None else None except (TypeError, ValueError): tmdb_id = None if isinstance(media_type, str): media_type = media_type.strip().lower() or None return tmdb_id, media_type def init_db() -> None: with _connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT NOT NULL, state TEXT NOT NULL, state_reason TEXT, created_at TEXT NOT NULL, payload_json TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS actions ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT NOT NULL, action_id TEXT NOT NULL, label TEXT NOT NULL, status TEXT NOT NULL, message TEXT, created_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, role TEXT NOT NULL, auth_provider TEXT NOT NULL DEFAULT 'local', created_at TEXT NOT NULL, last_login_at TEXT, is_blocked INTEGER NOT NULL DEFAULT 0, jellyfin_password_hash TEXT, last_jellyfin_auth_at TEXT ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT, updated_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS requests_cache ( request_id INTEGER PRIMARY KEY, media_id INTEGER, media_type TEXT, status INTEGER, title TEXT, year INTEGER, requested_by TEXT, requested_by_norm TEXT, created_at TEXT, updated_at TEXT, payload_json TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS artwork_cache_status ( request_id INTEGER PRIMARY KEY, tmdb_id INTEGER, media_type TEXT, poster_path TEXT, backdrop_path TEXT, has_tmdb INTEGER NOT NULL DEFAULT 0, poster_cached INTEGER NOT NULL DEFAULT 0, backdrop_cached INTEGER NOT NULL DEFAULT 0, updated_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_created_at ON requests_cache (created_at) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_norm ON requests_cache (requested_by_norm) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_artwork_cache_status_updated_at ON artwork_cache_status (updated_at) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS user_activity ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, ip TEXT NOT NULL, user_agent TEXT NOT NULL, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, hit_count INTEGER NOT NULL DEFAULT 1, UNIQUE(username, ip, user_agent) ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_user_activity_username ON user_activity (username) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_user_activity_last_seen ON user_activity (last_seen_at) """ ) try: conn.execute("ALTER TABLE users ADD COLUMN last_login_at TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN is_blocked INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN auth_provider TEXT NOT NULL DEFAULT 'local'") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN jellyfin_password_hash TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE users ADD COLUMN last_jellyfin_auth_at TEXT") except sqlite3.OperationalError: pass _backfill_auth_providers() ensure_admin_user() def save_snapshot(snapshot: Snapshot) -> None: payload = json.dumps(snapshot.model_dump(), ensure_ascii=True) created_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO snapshots (request_id, state, state_reason, created_at, payload_json) VALUES (?, ?, ?, ?, ?) """, ( snapshot.request_id, snapshot.state.value, snapshot.state_reason, created_at, payload, ), ) def save_action( request_id: str, action_id: str, label: str, status: str, message: Optional[str] = None, ) -> None: created_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO actions (request_id, action_id, label, status, message, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (request_id, action_id, label, status, message, created_at), ) def get_recent_snapshots(request_id: str, limit: int = 10) -> list[dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT request_id, state, state_reason, created_at, payload_json FROM snapshots WHERE request_id = ? ORDER BY id DESC LIMIT ? """, (request_id, limit), ).fetchall() results = [] for row in rows: results.append( { "request_id": row[0], "state": row[1], "state_reason": row[2], "created_at": row[3], "payload": json.loads(row[4]), } ) return results def get_recent_actions(request_id: str, limit: int = 10) -> list[dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT request_id, action_id, label, status, message, created_at FROM actions WHERE request_id = ? ORDER BY id DESC LIMIT ? """, (request_id, limit), ).fetchall() results = [] for row in rows: results.append( { "request_id": row[0], "action_id": row[1], "label": row[2], "status": row[3], "message": row[4], "created_at": row[5], } ) return results def ensure_admin_user() -> None: if not settings.admin_username or not settings.admin_password: return existing = get_user_by_username(settings.admin_username) if existing: return create_user(settings.admin_username, settings.admin_password, role="admin") def create_user(username: str, password: str, role: str = "user", auth_provider: str = "local") -> None: created_at = datetime.now(timezone.utc).isoformat() password_hash = hash_password(password) with _connect() as conn: conn.execute( """ INSERT INTO users (username, password_hash, role, auth_provider, created_at) VALUES (?, ?, ?, ?, ?) """, (username, password_hash, role, auth_provider, created_at), ) def create_user_if_missing( username: str, password: str, role: str = "user", auth_provider: str = "local" ) -> bool: created_at = datetime.now(timezone.utc).isoformat() password_hash = hash_password(password) with _connect() as conn: cursor = conn.execute( """ INSERT OR IGNORE INTO users (username, password_hash, role, auth_provider, created_at) VALUES (?, ?, ?, ?, ?) """, (username, password_hash, role, auth_provider, created_at), ) return cursor.rowcount > 0 def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT id, username, password_hash, role, auth_provider, created_at, last_login_at, is_blocked, jellyfin_password_hash, last_jellyfin_auth_at FROM users WHERE username = ? """, (username,), ).fetchone() if not row: return None return { "id": row[0], "username": row[1], "password_hash": row[2], "role": row[3], "auth_provider": row[4], "created_at": row[5], "last_login_at": row[6], "is_blocked": bool(row[7]), "jellyfin_password_hash": row[8], "last_jellyfin_auth_at": row[9], } def get_all_users() -> list[Dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT id, username, role, auth_provider, created_at, last_login_at, is_blocked FROM users ORDER BY username COLLATE NOCASE """ ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: results.append( { "id": row[0], "username": row[1], "role": row[2], "auth_provider": row[3], "created_at": row[4], "last_login_at": row[5], "is_blocked": bool(row[6]), } ) return results def set_last_login(username: str) -> None: timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ UPDATE users SET last_login_at = ? WHERE username = ? """, (timestamp, username), ) def set_user_blocked(username: str, blocked: bool) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET is_blocked = ? WHERE username = ? """, (1 if blocked else 0, username), ) def set_user_role(username: str, role: str) -> None: with _connect() as conn: conn.execute( """ UPDATE users SET role = ? WHERE username = ? """, (role, username), ) def verify_user_password(username: str, password: str) -> Optional[Dict[str, Any]]: user = get_user_by_username(username) if not user: return None if not verify_password(password, user["password_hash"]): return None return user def set_user_password(username: str, password: str) -> None: password_hash = hash_password(password) with _connect() as conn: conn.execute( """ UPDATE users SET password_hash = ? WHERE username = ? """, (password_hash, username), ) def set_jellyfin_auth_cache(username: str, password: str) -> None: if not username or not password: return password_hash = hash_password(password) timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ UPDATE users SET jellyfin_password_hash = ?, last_jellyfin_auth_at = ? WHERE username = ? """, (password_hash, timestamp, username), ) def _backfill_auth_providers() -> None: with _connect() as conn: rows = conn.execute( """ SELECT username, password_hash, auth_provider FROM users """ ).fetchall() updates: list[tuple[str, str]] = [] for row in rows: username, password_hash, auth_provider = row provider = auth_provider or "local" if provider == "local": if verify_password("jellyfin-user", password_hash): provider = "jellyfin" elif verify_password("jellyseerr-user", password_hash): provider = "jellyseerr" if provider != auth_provider: updates.append((provider, username)) if not updates: return with _connect() as conn: conn.executemany( """ UPDATE users SET auth_provider = ? WHERE username = ? """, updates, ) def upsert_user_activity(username: str, ip: str, user_agent: str) -> None: if not username: return ip_value = ip.strip() if isinstance(ip, str) and ip.strip() else "unknown" agent_value = ( user_agent.strip() if isinstance(user_agent, str) and user_agent.strip() else "unknown" ) timestamp = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO user_activity (username, ip, user_agent, first_seen_at, last_seen_at, hit_count) VALUES (?, ?, ?, ?, ?, 1) ON CONFLICT(username, ip, user_agent) DO UPDATE SET last_seen_at = excluded.last_seen_at, hit_count = hit_count + 1 """, (username, ip_value, agent_value, timestamp, timestamp), ) def get_user_activity(username: str, limit: int = 5) -> list[Dict[str, Any]]: limit = max(1, min(limit, 20)) with _connect() as conn: rows = conn.execute( """ SELECT ip, user_agent, first_seen_at, last_seen_at, hit_count FROM user_activity WHERE username = ? ORDER BY last_seen_at DESC LIMIT ? """, (username, limit), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: results.append( { "ip": row[0], "user_agent": row[1], "first_seen_at": row[2], "last_seen_at": row[3], "hit_count": row[4], } ) return results def get_user_activity_summary(username: str) -> Dict[str, Any]: with _connect() as conn: last_row = conn.execute( """ SELECT ip, user_agent, last_seen_at FROM user_activity WHERE username = ? ORDER BY last_seen_at DESC LIMIT 1 """, (username,), ).fetchone() count_row = conn.execute( """ SELECT COUNT(*) FROM user_activity WHERE username = ? """, (username,), ).fetchone() return { "last_ip": last_row[0] if last_row else None, "last_user_agent": last_row[1] if last_row else None, "last_seen_at": last_row[2] if last_row else None, "device_count": int(count_row[0] or 0) if count_row else 0, } def get_user_request_stats(username_norm: str) -> Dict[str, Any]: if not username_norm: return { "total": 0, "ready": 0, "pending": 0, "approved": 0, "working": 0, "partial": 0, "declined": 0, "in_progress": 0, "last_request_at": None, } with _connect() as conn: total_row = conn.execute( """ SELECT COUNT(*) FROM requests_cache WHERE requested_by_norm = ? """, (username_norm,), ).fetchone() status_rows = conn.execute( """ SELECT status, COUNT(*) FROM requests_cache WHERE requested_by_norm = ? GROUP BY status """, (username_norm,), ).fetchall() last_row = conn.execute( """ SELECT MAX(created_at) FROM requests_cache WHERE requested_by_norm = ? """, (username_norm,), ).fetchone() counts = {int(row[0]): int(row[1]) for row in status_rows if row[0] is not None} pending = counts.get(1, 0) approved = counts.get(2, 0) declined = counts.get(3, 0) ready = counts.get(4, 0) working = counts.get(5, 0) partial = counts.get(6, 0) in_progress = approved + working + partial return { "total": int(total_row[0] or 0) if total_row else 0, "ready": ready, "pending": pending, "approved": approved, "working": working, "partial": partial, "declined": declined, "in_progress": in_progress, "last_request_at": last_row[0] if last_row else None, } def get_global_request_leader() -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT requested_by_norm, MAX(requested_by) as display_name, COUNT(*) as total FROM requests_cache WHERE requested_by_norm IS NOT NULL AND requested_by_norm != '' GROUP BY requested_by_norm ORDER BY total DESC LIMIT 1 """ ).fetchone() if not row: return None return {"username": row[1] or row[0], "total": int(row[2] or 0)} def get_global_request_total() -> int: with _connect() as conn: row = conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone() return int(row[0] or 0) def upsert_request_cache( request_id: int, media_id: Optional[int], media_type: Optional[str], status: Optional[int], title: Optional[str], year: Optional[int], requested_by: Optional[str], requested_by_norm: Optional[str], created_at: Optional[str], updated_at: Optional[str], payload_json: str, ) -> None: normalized_title = _normalize_title_value(title) normalized_year = _normalize_year_value(year) derived_title = None derived_year = None if not normalized_title or normalized_year is None: derived_title, derived_year = _extract_title_year_from_payload(payload_json) if _is_placeholder_title(normalized_title, request_id): normalized_title = None if derived_title and not normalized_title: normalized_title = derived_title if normalized_year is None and derived_year is not None: normalized_year = derived_year with _connect() as conn: existing_title = None existing_year = None if normalized_title is None or normalized_year is None: row = conn.execute( "SELECT title, year FROM requests_cache WHERE request_id = ?", (request_id,), ).fetchone() if row: existing_title, existing_year = row[0], row[1] if _is_placeholder_title(existing_title, request_id): existing_title = None if normalized_title is None and existing_title: normalized_title = existing_title if normalized_year is None and existing_year is not None: normalized_year = existing_year conn.execute( """ INSERT INTO requests_cache ( request_id, media_id, media_type, status, title, year, requested_by, requested_by_norm, created_at, updated_at, payload_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(request_id) DO UPDATE SET media_id = excluded.media_id, media_type = excluded.media_type, status = excluded.status, title = excluded.title, year = excluded.year, requested_by = excluded.requested_by, requested_by_norm = excluded.requested_by_norm, created_at = excluded.created_at, updated_at = excluded.updated_at, payload_json = excluded.payload_json """, ( request_id, media_id, media_type, status, normalized_title, normalized_year, requested_by, requested_by_norm, created_at, updated_at, payload_json, ), ) logger.debug( "requests_cache upsert: request_id=%s media_id=%s status=%s updated_at=%s", request_id, media_id, status, updated_at, ) def get_request_cache_last_updated() -> Optional[str]: with _connect() as conn: row = conn.execute( """ SELECT MAX(updated_at) FROM requests_cache """ ).fetchone() if not row: return None return row[0] def get_request_cache_by_id(request_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT request_id, updated_at, title FROM requests_cache WHERE request_id = ? """, (request_id,), ).fetchone() if not row: logger.debug("requests_cache miss: request_id=%s", request_id) return None logger.debug("requests_cache hit: request_id=%s updated_at=%s", row[0], row[1]) return {"request_id": row[0], "updated_at": row[1], "title": row[2]} def get_request_cache_payload(request_id: int) -> Optional[Dict[str, Any]]: with _connect() as conn: row = conn.execute( """ SELECT payload_json FROM requests_cache WHERE request_id = ? """, (request_id,), ).fetchone() if not row or not row[0]: logger.debug("requests_cache payload miss: request_id=%s", request_id) return None try: payload = json.loads(row[0]) logger.debug("requests_cache payload hit: request_id=%s", request_id) return payload except json.JSONDecodeError: logger.warning("requests_cache payload invalid json: request_id=%s", request_id) return None def get_cached_requests( limit: int, offset: int, requested_by_norm: Optional[str] = None, since_iso: Optional[str] = None, ) -> list[Dict[str, Any]]: query = """ SELECT request_id, media_id, media_type, status, title, year, requested_by, created_at, payload_json FROM requests_cache """ params: list[Any] = [] conditions = [] if requested_by_norm: conditions.append("requested_by_norm = ?") params.append(requested_by_norm) if since_iso: conditions.append("created_at >= ?") params.append(since_iso) if conditions: query += " WHERE " + " AND ".join(conditions) query += " ORDER BY created_at DESC, request_id DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) with _connect() as conn: rows = conn.execute(query, tuple(params)).fetchall() logger.debug( "requests_cache list: count=%s requested_by_norm=%s since_iso=%s", len(rows), requested_by_norm, since_iso, ) results: list[Dict[str, Any]] = [] for row in rows: title = row[4] year = row[5] if (not title or not year) and row[8]: derived_title, derived_year = _extract_title_year_from_payload(row[8]) if not title: title = derived_title if not year: year = derived_year results.append( { "request_id": row[0], "media_id": row[1], "media_type": row[2], "status": row[3], "title": title, "year": year, "requested_by": row[6], "created_at": row[7], } ) return results def get_request_cache_overview(limit: int = 50) -> list[Dict[str, Any]]: limit = max(1, min(limit, 200)) with _connect() as conn: rows = conn.execute( """ SELECT request_id, media_id, media_type, status, title, year, requested_by, created_at, updated_at, payload_json FROM requests_cache ORDER BY updated_at DESC, request_id DESC LIMIT ? """, (limit,), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: title = row[4] if not title and row[9]: derived_title, _ = _extract_title_year_from_payload(row[9]) title = derived_title or row[4] results.append( { "request_id": row[0], "media_id": row[1], "media_type": row[2], "status": row[3], "title": title, "year": row[5], "requested_by": row[6], "created_at": row[7], "updated_at": row[8], } ) return results def get_request_cache_missing_titles(limit: int = 200) -> list[Dict[str, Any]]: limit = max(1, min(limit, 500)) with _connect() as conn: rows = conn.execute( """ SELECT request_id, payload_json FROM requests_cache WHERE title IS NULL OR TRIM(title) = '' OR LOWER(title) = 'untitled' ORDER BY updated_at DESC, request_id DESC LIMIT ? """, (limit,), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: payload_json = row[1] tmdb_id, media_type = _extract_tmdb_from_payload(payload_json) results.append( { "request_id": row[0], "payload_json": payload_json, "tmdb_id": tmdb_id, "media_type": media_type, } ) return results def get_request_cache_count() -> int: with _connect() as conn: row = conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone() return int(row[0] or 0) def upsert_artwork_cache_status( request_id: int, tmdb_id: Optional[int], media_type: Optional[str], poster_path: Optional[str], backdrop_path: Optional[str], has_tmdb: bool, poster_cached: bool, backdrop_cached: bool, ) -> None: updated_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO artwork_cache_status ( request_id, tmdb_id, media_type, poster_path, backdrop_path, has_tmdb, poster_cached, backdrop_cached, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(request_id) DO UPDATE SET tmdb_id = excluded.tmdb_id, media_type = excluded.media_type, poster_path = excluded.poster_path, backdrop_path = excluded.backdrop_path, has_tmdb = excluded.has_tmdb, poster_cached = excluded.poster_cached, backdrop_cached = excluded.backdrop_cached, updated_at = excluded.updated_at """, ( request_id, tmdb_id, media_type, poster_path, backdrop_path, 1 if has_tmdb else 0, 1 if poster_cached else 0, 1 if backdrop_cached else 0, updated_at, ), ) def get_artwork_cache_status_count() -> int: with _connect() as conn: row = conn.execute("SELECT COUNT(*) FROM artwork_cache_status").fetchone() return int(row[0] or 0) def get_artwork_cache_missing_count() -> int: with _connect() as conn: row = conn.execute( """ SELECT COUNT(*) FROM artwork_cache_status WHERE ( (poster_path IS NULL AND has_tmdb = 1) OR (poster_path IS NOT NULL AND poster_cached = 0) OR (backdrop_path IS NULL AND has_tmdb = 1) OR (backdrop_path IS NOT NULL AND backdrop_cached = 0) ) """ ).fetchone() return int(row[0] or 0) def update_artwork_cache_stats( cache_bytes: Optional[int] = None, cache_files: Optional[int] = None, missing_count: Optional[int] = None, total_requests: Optional[int] = None, ) -> None: updated_at = datetime.now(timezone.utc).isoformat() if cache_bytes is not None: set_setting("artwork_cache_bytes", str(int(cache_bytes))) if cache_files is not None: set_setting("artwork_cache_files", str(int(cache_files))) if missing_count is not None: set_setting("artwork_cache_missing", str(int(missing_count))) if total_requests is not None: set_setting("artwork_cache_total_requests", str(int(total_requests))) set_setting("artwork_cache_updated_at", updated_at) def get_artwork_cache_stats() -> Dict[str, Any]: def _get_int(key: str) -> int: value = get_setting(key) if value is None: return 0 try: return int(value) except (TypeError, ValueError): return 0 return { "cache_bytes": _get_int("artwork_cache_bytes"), "cache_files": _get_int("artwork_cache_files"), "missing_artwork": _get_int("artwork_cache_missing"), "total_requests": _get_int("artwork_cache_total_requests"), "updated_at": get_setting("artwork_cache_updated_at"), } def update_request_cache_title( request_id: int, title: str, year: Optional[int] = None ) -> None: normalized_title = _normalize_title_value(title) normalized_year = _normalize_year_value(year) if not normalized_title: return with _connect() as conn: conn.execute( """ UPDATE requests_cache SET title = ?, year = COALESCE(?, year) WHERE request_id = ? """, (normalized_title, normalized_year, request_id), ) def repair_request_cache_titles() -> int: updated = 0 with _connect() as conn: rows = conn.execute( """ SELECT request_id, title, year, payload_json FROM requests_cache """ ).fetchall() for row in rows: request_id, title, year, payload_json = row if not _is_placeholder_title(title, request_id): continue derived_title, derived_year = _extract_title_year_from_payload(payload_json) if not derived_title: continue conn.execute( """ UPDATE requests_cache SET title = ?, year = COALESCE(?, year) WHERE request_id = ? """, (derived_title, derived_year, request_id), ) updated += 1 return updated def prune_duplicate_requests_cache() -> int: with _connect() as conn: cursor = conn.execute( """ DELETE FROM requests_cache WHERE media_id IS NOT NULL AND request_id NOT IN ( SELECT MAX(request_id) FROM requests_cache WHERE media_id IS NOT NULL GROUP BY media_id, COALESCE(requested_by_norm, '') ) """ ) return cursor.rowcount def get_request_cache_payloads(limit: int = 200, offset: int = 0) -> list[Dict[str, Any]]: limit = max(1, min(limit, 1000)) offset = max(0, offset) with _connect() as conn: rows = conn.execute( """ SELECT request_id, payload_json FROM requests_cache ORDER BY request_id ASC LIMIT ? OFFSET ? """, (limit, offset), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: payload = None if row[1]: try: payload = json.loads(row[1]) except json.JSONDecodeError: payload = None results.append({"request_id": row[0], "payload": payload}) return results def get_request_cache_payloads_missing(limit: int = 200, offset: int = 0) -> list[Dict[str, Any]]: limit = max(1, min(limit, 1000)) offset = max(0, offset) with _connect() as conn: rows = conn.execute( """ SELECT rc.request_id, rc.payload_json FROM requests_cache rc JOIN artwork_cache_status acs ON rc.request_id = acs.request_id WHERE ( (acs.poster_path IS NULL AND acs.has_tmdb = 1) OR (acs.poster_path IS NOT NULL AND acs.poster_cached = 0) OR (acs.backdrop_path IS NULL AND acs.has_tmdb = 1) OR (acs.backdrop_path IS NOT NULL AND acs.backdrop_cached = 0) ) ORDER BY rc.request_id ASC LIMIT ? OFFSET ? """, (limit, offset), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: payload = None if row[1]: try: payload = json.loads(row[1]) except json.JSONDecodeError: payload = None results.append({"request_id": row[0], "payload": payload}) return results def get_cached_requests_since(since_iso: str) -> list[Dict[str, Any]]: with _connect() as conn: rows = conn.execute( """ SELECT request_id, media_id, media_type, status, title, year, requested_by, requested_by_norm, created_at FROM requests_cache WHERE created_at >= ? ORDER BY created_at DESC, request_id DESC """, (since_iso,), ).fetchall() results: list[Dict[str, Any]] = [] for row in rows: results.append( { "request_id": row[0], "media_id": row[1], "media_type": row[2], "status": row[3], "title": row[4], "year": row[5], "requested_by": row[6], "requested_by_norm": row[7], "created_at": row[8], } ) return results def get_cached_request_by_media_id( media_id: int, requested_by_norm: Optional[str] = None ) -> Optional[Dict[str, Any]]: query = """ SELECT request_id, status FROM requests_cache WHERE media_id = ? """ params: list[Any] = [media_id] if requested_by_norm: query += " AND requested_by_norm = ?" params.append(requested_by_norm) query += " ORDER BY created_at DESC, request_id DESC LIMIT 1" with _connect() as conn: row = conn.execute(query, tuple(params)).fetchone() if not row: return None return {"request_id": row[0], "status": row[1]} def get_setting(key: str) -> Optional[str]: with _connect() as conn: row = conn.execute( """ SELECT value FROM settings WHERE key = ? """, (key,), ).fetchone() if not row: return None return row[0] def set_setting(key: str, value: Optional[str]) -> None: updated_at = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO settings (key, value, updated_at) VALUES (?, ?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at """, (key, value, updated_at), ) def delete_setting(key: str) -> None: with _connect() as conn: conn.execute( """ DELETE FROM settings WHERE key = ? """, (key,), ) def get_settings_overrides() -> Dict[str, str]: with _connect() as conn: rows = conn.execute( """ SELECT key, value FROM settings """ ).fetchall() overrides: Dict[str, str] = {} for row in rows: key = row[0] value = row[1] if key: overrides[key] = value return overrides def run_integrity_check() -> str: with _connect() as conn: row = conn.execute("PRAGMA integrity_check").fetchone() if not row: return "unknown" return str(row[0]) def vacuum_db() -> None: with _connect() as conn: conn.execute("VACUUM") def clear_requests_cache() -> int: with _connect() as conn: cursor = conn.execute("DELETE FROM requests_cache") return cursor.rowcount def clear_history() -> Dict[str, int]: with _connect() as conn: actions = conn.execute("DELETE FROM actions").rowcount snapshots = conn.execute("DELETE FROM snapshots").rowcount return {"actions": actions, "snapshots": snapshots} def cleanup_history(days: int) -> Dict[str, int]: if days <= 0: return {"actions": 0, "snapshots": 0} cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() with _connect() as conn: actions = conn.execute( "DELETE FROM actions WHERE created_at < ?", (cutoff,), ).rowcount snapshots = conn.execute( "DELETE FROM snapshots WHERE created_at < ?", (cutoff,), ).rowcount return {"actions": actions, "snapshots": snapshots}