Files
Magent/backend/app/db.py

2244 lines
70 KiB
Python

import json
import os
import sqlite3
import logging
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, Optional
from .config import settings
from .models import Snapshot
from .security import hash_password, verify_password
logger = logging.getLogger(__name__)
def _db_path() -> str:
path = settings.sqlite_path or "data/magent.db"
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
def _connect() -> sqlite3.Connection:
return sqlite3.connect(_db_path())
def _parse_datetime_value(value: Optional[str]) -> Optional[datetime]:
if not isinstance(value, str) or not value.strip():
return None
candidate = value.strip()
if candidate.endswith("Z"):
candidate = candidate[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(candidate)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
def _is_datetime_in_past(value: Optional[str]) -> bool:
parsed = _parse_datetime_value(value)
if parsed is None:
return False
return parsed <= datetime.now(timezone.utc)
def _normalize_title_value(title: Optional[str]) -> Optional[str]:
if not isinstance(title, str):
return None
trimmed = title.strip()
return trimmed if trimmed else None
def _normalize_year_value(year: Optional[Any]) -> Optional[int]:
if isinstance(year, int):
return year
if isinstance(year, str):
trimmed = year.strip()
if trimmed.isdigit():
return int(trimmed)
return None
def _is_placeholder_title(title: Optional[str], request_id: Optional[int]) -> bool:
if not isinstance(title, str):
return True
normalized = title.strip().lower()
if not normalized:
return True
if normalized == "untitled":
return True
if request_id and normalized == f"request {request_id}":
return True
return False
def _extract_title_year_from_payload(payload_json: Optional[str]) -> tuple[Optional[str], Optional[int]]:
if not payload_json:
return None, None
try:
payload = json.loads(payload_json)
except json.JSONDecodeError:
return None, None
if not isinstance(payload, dict):
return None, None
media = payload.get("media") or {}
title = None
year = None
if isinstance(media, dict):
title = media.get("title") or media.get("name")
year = media.get("year")
if not title:
title = payload.get("title") or payload.get("name")
if year is None:
year = payload.get("year")
return _normalize_title_value(title), _normalize_year_value(year)
def _extract_tmdb_from_payload(payload_json: Optional[str]) -> tuple[Optional[int], Optional[str]]:
if not payload_json:
return None, None
try:
payload = json.loads(payload_json)
except (TypeError, json.JSONDecodeError):
return None, None
if not isinstance(payload, dict):
return None, None
media = payload.get("media") or {}
if not isinstance(media, dict):
media = {}
tmdb_id = (
media.get("tmdbId")
or payload.get("tmdbId")
or payload.get("tmdb_id")
or media.get("externalServiceId")
or payload.get("externalServiceId")
)
media_type = (
media.get("mediaType")
or payload.get("mediaType")
or payload.get("media_type")
or payload.get("type")
)
try:
tmdb_id = int(tmdb_id) if tmdb_id is not None else None
except (TypeError, ValueError):
tmdb_id = None
if isinstance(media_type, str):
media_type = media_type.strip().lower() or None
return tmdb_id, media_type
def init_db() -> None:
with _connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT NOT NULL,
state TEXT NOT NULL,
state_reason TEXT,
created_at TEXT NOT NULL,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS actions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT NOT NULL,
action_id TEXT NOT NULL,
label TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
created_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role TEXT NOT NULL,
auth_provider TEXT NOT NULL DEFAULT 'local',
jellyseerr_user_id INTEGER,
created_at TEXT NOT NULL,
last_login_at TEXT,
is_blocked INTEGER NOT NULL DEFAULT 0,
auto_search_enabled INTEGER NOT NULL DEFAULT 1,
invite_management_enabled INTEGER NOT NULL DEFAULT 0,
profile_id INTEGER,
expires_at TEXT,
invited_by_code TEXT,
invited_at TEXT,
jellyfin_password_hash TEXT,
last_jellyfin_auth_at TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS user_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
role TEXT NOT NULL DEFAULT 'user',
auto_search_enabled INTEGER NOT NULL DEFAULT 1,
account_expires_days INTEGER,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS signup_invites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL UNIQUE,
label TEXT,
description TEXT,
profile_id INTEGER,
role TEXT,
max_uses INTEGER,
use_count INTEGER NOT NULL DEFAULT 0,
enabled INTEGER NOT NULL DEFAULT 1,
expires_at TEXT,
created_by TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_signup_invites_enabled
ON signup_invites (enabled)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_signup_invites_expires_at
ON signup_invites (expires_at)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS requests_cache (
request_id INTEGER PRIMARY KEY,
media_id INTEGER,
media_type TEXT,
status INTEGER,
title TEXT,
year INTEGER,
requested_by TEXT,
requested_by_norm TEXT,
requested_by_id INTEGER,
created_at TEXT,
updated_at TEXT,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS artwork_cache_status (
request_id INTEGER PRIMARY KEY,
tmdb_id INTEGER,
media_type TEXT,
poster_path TEXT,
backdrop_path TEXT,
has_tmdb INTEGER NOT NULL DEFAULT 0,
poster_cached INTEGER NOT NULL DEFAULT 0,
backdrop_cached INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_cache_created_at
ON requests_cache (created_at)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_norm
ON requests_cache (requested_by_norm)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_artwork_cache_status_updated_at
ON artwork_cache_status (updated_at)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS user_activity (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
ip TEXT NOT NULL,
user_agent TEXT NOT NULL,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
hit_count INTEGER NOT NULL DEFAULT 1,
UNIQUE(username, ip, user_agent)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_user_activity_username
ON user_activity (username)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_user_activity_last_seen
ON user_activity (last_seen_at)
"""
)
try:
conn.execute("ALTER TABLE users ADD COLUMN last_login_at TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN is_blocked INTEGER NOT NULL DEFAULT 0")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN auth_provider TEXT NOT NULL DEFAULT 'local'")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN jellyfin_password_hash TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN last_jellyfin_auth_at TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN jellyseerr_user_id INTEGER")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN auto_search_enabled INTEGER NOT NULL DEFAULT 1")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN invite_management_enabled INTEGER NOT NULL DEFAULT 0")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN profile_id INTEGER")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN expires_at TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN invited_by_code TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN invited_at TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_profile_id
ON users (profile_id)
"""
)
except sqlite3.OperationalError:
pass
try:
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_expires_at
ON users (expires_at)
"""
)
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE requests_cache ADD COLUMN requested_by_id INTEGER")
except sqlite3.OperationalError:
pass
try:
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_id
ON requests_cache (requested_by_id)
"""
)
except sqlite3.OperationalError:
pass
_backfill_auth_providers()
ensure_admin_user()
def save_snapshot(snapshot: Snapshot) -> None:
payload = json.dumps(snapshot.model_dump(), ensure_ascii=True)
created_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
INSERT INTO snapshots (request_id, state, state_reason, created_at, payload_json)
VALUES (?, ?, ?, ?, ?)
""",
(
snapshot.request_id,
snapshot.state.value,
snapshot.state_reason,
created_at,
payload,
),
)
def save_action(
request_id: str,
action_id: str,
label: str,
status: str,
message: Optional[str] = None,
) -> None:
created_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
INSERT INTO actions (request_id, action_id, label, status, message, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(request_id, action_id, label, status, message, created_at),
)
def get_recent_snapshots(request_id: str, limit: int = 10) -> list[dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT request_id, state, state_reason, created_at, payload_json
FROM snapshots
WHERE request_id = ?
ORDER BY id DESC
LIMIT ?
""",
(request_id, limit),
).fetchall()
results = []
for row in rows:
results.append(
{
"request_id": row[0],
"state": row[1],
"state_reason": row[2],
"created_at": row[3],
"payload": json.loads(row[4]),
}
)
return results
def get_recent_actions(request_id: str, limit: int = 10) -> list[dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT request_id, action_id, label, status, message, created_at
FROM actions
WHERE request_id = ?
ORDER BY id DESC
LIMIT ?
""",
(request_id, limit),
).fetchall()
results = []
for row in rows:
results.append(
{
"request_id": row[0],
"action_id": row[1],
"label": row[2],
"status": row[3],
"message": row[4],
"created_at": row[5],
}
)
return results
def ensure_admin_user() -> None:
if not settings.admin_username or not settings.admin_password:
return
existing = get_user_by_username(settings.admin_username)
if existing:
return
create_user(settings.admin_username, settings.admin_password, role="admin")
def create_user(
username: str,
password: str,
role: str = "user",
auth_provider: str = "local",
jellyseerr_user_id: Optional[int] = None,
auto_search_enabled: bool = True,
invite_management_enabled: bool = False,
profile_id: Optional[int] = None,
expires_at: Optional[str] = None,
invited_by_code: Optional[str] = None,
) -> None:
created_at = datetime.now(timezone.utc).isoformat()
password_hash = hash_password(password)
with _connect() as conn:
conn.execute(
"""
INSERT INTO users (
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
auto_search_enabled,
invite_management_enabled,
profile_id,
expires_at,
invited_by_code,
invited_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
1 if auto_search_enabled else 0,
1 if invite_management_enabled else 0,
profile_id,
expires_at,
invited_by_code,
created_at if invited_by_code else None,
),
)
def create_user_if_missing(
username: str,
password: str,
role: str = "user",
auth_provider: str = "local",
jellyseerr_user_id: Optional[int] = None,
auto_search_enabled: bool = True,
invite_management_enabled: bool = False,
profile_id: Optional[int] = None,
expires_at: Optional[str] = None,
invited_by_code: Optional[str] = None,
) -> bool:
created_at = datetime.now(timezone.utc).isoformat()
password_hash = hash_password(password)
with _connect() as conn:
cursor = conn.execute(
"""
INSERT OR IGNORE INTO users (
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
auto_search_enabled,
invite_management_enabled,
profile_id,
expires_at,
invited_by_code,
invited_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
1 if auto_search_enabled else 0,
1 if invite_management_enabled else 0,
profile_id,
expires_at,
invited_by_code,
created_at if invited_by_code else None,
),
)
return cursor.rowcount > 0
def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE username = ? COLLATE NOCASE
""",
(username,),
).fetchone()
if not row:
return None
return {
"id": row[0],
"username": row[1],
"password_hash": row[2],
"role": row[3],
"auth_provider": row[4],
"jellyseerr_user_id": row[5],
"created_at": row[6],
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"invite_management_enabled": bool(row[10]),
"profile_id": row[11],
"expires_at": row[12],
"invited_by_code": row[13],
"invited_at": row[14],
"is_expired": _is_datetime_in_past(row[12]),
"jellyfin_password_hash": row[15],
"last_jellyfin_auth_at": row[16],
}
def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE id = ?
""",
(user_id,),
).fetchone()
if not row:
return None
return {
"id": row[0],
"username": row[1],
"password_hash": row[2],
"role": row[3],
"auth_provider": row[4],
"jellyseerr_user_id": row[5],
"created_at": row[6],
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"invite_management_enabled": bool(row[10]),
"profile_id": row[11],
"expires_at": row[12],
"invited_by_code": row[13],
"invited_at": row[14],
"is_expired": _is_datetime_in_past(row[12]),
"jellyfin_password_hash": row[15],
"last_jellyfin_auth_at": row[16],
}
def get_all_users() -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, username, role, auth_provider, jellyseerr_user_id, created_at,
last_login_at, is_blocked, auto_search_enabled, invite_management_enabled,
profile_id, expires_at, invited_by_code, invited_at
FROM users
ORDER BY username COLLATE NOCASE
"""
).fetchall()
all_rows: list[Dict[str, Any]] = []
for row in rows:
all_rows.append(
{
"id": row[0],
"username": row[1],
"role": row[2],
"auth_provider": row[3],
"jellyseerr_user_id": row[4],
"created_at": row[5],
"last_login_at": row[6],
"is_blocked": bool(row[7]),
"auto_search_enabled": bool(row[8]),
"invite_management_enabled": bool(row[9]),
"profile_id": row[10],
"expires_at": row[11],
"invited_by_code": row[12],
"invited_at": row[13],
"is_expired": _is_datetime_in_past(row[11]),
}
)
# Admin user management uses Jellyfin as the source of truth for non-admin
# user objects. Seerr rows are treated as enrichment-only and hidden
# from admin/user-management views to avoid duplicate accounts in the UI.
def _provider_rank(user: Dict[str, Any]) -> int:
provider = str(user.get("auth_provider") or "local").strip().lower()
if provider == "jellyfin":
return 0
if provider == "local":
return 1
if provider == "jellyseerr":
return 2
return 2
visible_candidates = [
user
for user in all_rows
if not (
str(user.get("auth_provider") or "local").strip().lower() == "jellyseerr"
and str(user.get("role") or "user").strip().lower() != "admin"
)
]
visible_candidates.sort(
key=lambda user: (
0 if str(user.get("role") or "user").strip().lower() == "admin" else 1,
0 if isinstance(user.get("jellyseerr_user_id"), int) else 1,
_provider_rank(user),
0 if user.get("last_login_at") else 1,
int(user.get("id") or 0),
)
)
seen_usernames: set[str] = set()
seen_jellyseerr_ids: set[int] = set()
results: list[Dict[str, Any]] = []
for user in visible_candidates:
username = str(user.get("username") or "").strip()
if not username:
continue
username_key = username.lower()
jellyseerr_user_id = user.get("jellyseerr_user_id")
if isinstance(jellyseerr_user_id, int) and jellyseerr_user_id in seen_jellyseerr_ids:
continue
if username_key in seen_usernames:
continue
results.append(user)
seen_usernames.add(username_key)
if isinstance(jellyseerr_user_id, int):
seen_jellyseerr_ids.add(jellyseerr_user_id)
results.sort(key=lambda user: str(user.get("username") or "").lower())
return results
def delete_non_admin_users() -> int:
with _connect() as conn:
cursor = conn.execute(
"""
DELETE FROM users WHERE role != 'admin'
"""
)
return cursor.rowcount
def set_user_jellyseerr_id(username: str, jellyseerr_user_id: Optional[int]) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET jellyseerr_user_id = ? WHERE username = ?
""",
(jellyseerr_user_id, username),
)
def set_user_auth_provider(username: str, auth_provider: str) -> None:
provider = (auth_provider or "local").strip().lower() or "local"
with _connect() as conn:
conn.execute(
"""
UPDATE users SET auth_provider = ? WHERE username = ?
""",
(provider, username),
)
def set_last_login(username: str) -> None:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
UPDATE users SET last_login_at = ? WHERE username = ?
""",
(timestamp, username),
)
def set_user_blocked(username: str, blocked: bool) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET is_blocked = ? WHERE username = ?
""",
(1 if blocked else 0, username),
)
def delete_user_by_username(username: str) -> bool:
with _connect() as conn:
cursor = conn.execute(
"""
DELETE FROM users WHERE username = ? COLLATE NOCASE
""",
(username,),
)
return cursor.rowcount > 0
def delete_user_activity_by_username(username: str) -> int:
with _connect() as conn:
cursor = conn.execute(
"""
DELETE FROM user_activity WHERE username = ? COLLATE NOCASE
""",
(username,),
)
return cursor.rowcount
def disable_signup_invites_by_creator(username: str) -> int:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE signup_invites
SET enabled = 0, updated_at = ?
WHERE created_by = ? COLLATE NOCASE AND enabled != 0
""",
(timestamp, username),
)
return cursor.rowcount
def set_user_role(username: str, role: str) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET role = ? WHERE username = ?
""",
(role, username),
)
def set_user_auto_search_enabled(username: str, enabled: bool) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET auto_search_enabled = ? WHERE username = ?
""",
(1 if enabled else 0, username),
)
def set_user_invite_management_enabled(username: str, enabled: bool) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET invite_management_enabled = ? WHERE username = ? COLLATE NOCASE
""",
(1 if enabled else 0, username),
)
def set_auto_search_enabled_for_non_admin_users(enabled: bool) -> int:
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE users SET auto_search_enabled = ? WHERE role != 'admin'
""",
(1 if enabled else 0,),
)
return cursor.rowcount
def set_invite_management_enabled_for_non_admin_users(enabled: bool) -> int:
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE users SET invite_management_enabled = ? WHERE role != 'admin'
""",
(1 if enabled else 0,),
)
return cursor.rowcount
def set_user_profile_id(username: str, profile_id: Optional[int]) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET profile_id = ? WHERE username = ? COLLATE NOCASE
""",
(profile_id, username),
)
def set_user_expires_at(username: str, expires_at: Optional[str]) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET expires_at = ? WHERE username = ? COLLATE NOCASE
""",
(expires_at, username),
)
def _row_to_user_profile(row: Any) -> Dict[str, Any]:
return {
"id": row[0],
"name": row[1],
"description": row[2],
"role": row[3],
"auto_search_enabled": bool(row[4]),
"account_expires_days": row[5],
"is_active": bool(row[6]),
"created_at": row[7],
"updated_at": row[8],
}
def list_user_profiles() -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at
FROM user_profiles
ORDER BY name COLLATE NOCASE
"""
).fetchall()
return [_row_to_user_profile(row) for row in rows]
def get_user_profile(profile_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at
FROM user_profiles
WHERE id = ?
""",
(profile_id,),
).fetchone()
if not row:
return None
return _row_to_user_profile(row)
def create_user_profile(
name: str,
description: Optional[str] = None,
role: str = "user",
auto_search_enabled: bool = True,
account_expires_days: Optional[int] = None,
is_active: bool = True,
) -> Dict[str, Any]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO user_profiles (
name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
name,
description,
role,
1 if auto_search_enabled else 0,
account_expires_days,
1 if is_active else 0,
timestamp,
timestamp,
),
)
profile_id = int(cursor.lastrowid)
profile = get_user_profile(profile_id)
if not profile:
raise RuntimeError("Profile creation failed")
return profile
def update_user_profile(
profile_id: int,
*,
name: str,
description: Optional[str],
role: str,
auto_search_enabled: bool,
account_expires_days: Optional[int],
is_active: bool,
) -> Optional[Dict[str, Any]]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE user_profiles
SET name = ?, description = ?, role = ?, auto_search_enabled = ?,
account_expires_days = ?, is_active = ?, updated_at = ?
WHERE id = ?
""",
(
name,
description,
role,
1 if auto_search_enabled else 0,
account_expires_days,
1 if is_active else 0,
timestamp,
profile_id,
),
)
if cursor.rowcount <= 0:
return None
return get_user_profile(profile_id)
def delete_user_profile(profile_id: int) -> bool:
with _connect() as conn:
users_count = conn.execute(
"SELECT COUNT(*) FROM users WHERE profile_id = ?",
(profile_id,),
).fetchone()
invites_count = conn.execute(
"SELECT COUNT(*) FROM signup_invites WHERE profile_id = ?",
(profile_id,),
).fetchone()
if int((users_count or [0])[0] or 0) > 0:
raise ValueError("Profile is assigned to existing users.")
if int((invites_count or [0])[0] or 0) > 0:
raise ValueError("Profile is assigned to existing invites.")
cursor = conn.execute(
"DELETE FROM user_profiles WHERE id = ?",
(profile_id,),
)
return cursor.rowcount > 0
def _row_to_signup_invite(row: Any) -> Dict[str, Any]:
max_uses = row[6]
use_count = int(row[7] or 0)
expires_at = row[9]
is_expired = _is_datetime_in_past(expires_at)
remaining_uses = None if max_uses is None else max(int(max_uses) - use_count, 0)
return {
"id": row[0],
"code": row[1],
"label": row[2],
"description": row[3],
"profile_id": row[4],
"role": row[5],
"max_uses": max_uses,
"use_count": use_count,
"enabled": bool(row[8]),
"expires_at": expires_at,
"created_by": row[10],
"created_at": row[11],
"updated_at": row[12],
"is_expired": is_expired,
"remaining_uses": remaining_uses,
"is_usable": bool(row[8]) and not is_expired and (remaining_uses is None or remaining_uses > 0),
}
def list_signup_invites() -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
FROM signup_invites
ORDER BY created_at DESC, id DESC
"""
).fetchall()
return [_row_to_signup_invite(row) for row in rows]
def get_signup_invite_by_id(invite_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
FROM signup_invites
WHERE id = ?
""",
(invite_id,),
).fetchone()
if not row:
return None
return _row_to_signup_invite(row)
def get_signup_invite_by_code(code: str) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
FROM signup_invites
WHERE code = ? COLLATE NOCASE
""",
(code,),
).fetchone()
if not row:
return None
return _row_to_signup_invite(row)
def create_signup_invite(
*,
code: str,
label: Optional[str] = None,
description: Optional[str] = None,
profile_id: Optional[int] = None,
role: Optional[str] = None,
max_uses: Optional[int] = None,
enabled: bool = True,
expires_at: Optional[str] = None,
created_by: Optional[str] = None,
) -> Dict[str, Any]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO signup_invites (
code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?, ?)
""",
(
code,
label,
description,
profile_id,
role,
max_uses,
1 if enabled else 0,
expires_at,
created_by,
timestamp,
timestamp,
),
)
invite_id = int(cursor.lastrowid)
invite = get_signup_invite_by_id(invite_id)
if not invite:
raise RuntimeError("Invite creation failed")
return invite
def update_signup_invite(
invite_id: int,
*,
code: str,
label: Optional[str],
description: Optional[str],
profile_id: Optional[int],
role: Optional[str],
max_uses: Optional[int],
enabled: bool,
expires_at: Optional[str],
) -> Optional[Dict[str, Any]]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE signup_invites
SET code = ?, label = ?, description = ?, profile_id = ?, role = ?, max_uses = ?,
enabled = ?, expires_at = ?, updated_at = ?
WHERE id = ?
""",
(
code,
label,
description,
profile_id,
role,
max_uses,
1 if enabled else 0,
expires_at,
timestamp,
invite_id,
),
)
if cursor.rowcount <= 0:
return None
return get_signup_invite_by_id(invite_id)
def delete_signup_invite(invite_id: int) -> bool:
with _connect() as conn:
cursor = conn.execute(
"DELETE FROM signup_invites WHERE id = ?",
(invite_id,),
)
return cursor.rowcount > 0
def increment_signup_invite_use(invite_id: int) -> None:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
UPDATE signup_invites
SET use_count = use_count + 1, updated_at = ?
WHERE id = ?
""",
(timestamp, invite_id),
)
def verify_user_password(username: str, password: str) -> Optional[Dict[str, Any]]:
# Resolve case-insensitive duplicates safely by only considering local-provider rows.
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE username = ? COLLATE NOCASE
ORDER BY
CASE WHEN username = ? THEN 0 ELSE 1 END,
id ASC
""",
(username, username),
).fetchall()
if not rows:
return None
for row in rows:
provider = str(row[4] or "local").lower()
if provider != "local":
continue
if not verify_password(password, row[2]):
continue
return {
"id": row[0],
"username": row[1],
"password_hash": row[2],
"role": row[3],
"auth_provider": row[4],
"jellyseerr_user_id": row[5],
"created_at": row[6],
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"invite_management_enabled": bool(row[10]),
"profile_id": row[11],
"expires_at": row[12],
"invited_by_code": row[13],
"invited_at": row[14],
"is_expired": _is_datetime_in_past(row[12]),
"jellyfin_password_hash": row[15],
"last_jellyfin_auth_at": row[16],
}
return None
def get_users_by_username_ci(username: str) -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE username = ? COLLATE NOCASE
ORDER BY
CASE WHEN username = ? THEN 0 ELSE 1 END,
id ASC
""",
(username, username),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
results.append(
{
"id": row[0],
"username": row[1],
"password_hash": row[2],
"role": row[3],
"auth_provider": row[4],
"jellyseerr_user_id": row[5],
"created_at": row[6],
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"invite_management_enabled": bool(row[10]),
"profile_id": row[11],
"expires_at": row[12],
"invited_by_code": row[13],
"invited_at": row[14],
"is_expired": _is_datetime_in_past(row[12]),
"jellyfin_password_hash": row[15],
"last_jellyfin_auth_at": row[16],
}
)
return results
def set_user_password(username: str, password: str) -> None:
password_hash = hash_password(password)
with _connect() as conn:
conn.execute(
"""
UPDATE users SET password_hash = ? WHERE username = ?
""",
(password_hash, username),
)
def set_jellyfin_auth_cache(username: str, password: str) -> None:
if not username or not password:
return
password_hash = hash_password(password)
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
UPDATE users
SET jellyfin_password_hash = ?, last_jellyfin_auth_at = ?
WHERE username = ? COLLATE NOCASE
""",
(password_hash, timestamp, username),
)
def _backfill_auth_providers() -> None:
with _connect() as conn:
rows = conn.execute(
"""
SELECT username, password_hash, auth_provider
FROM users
"""
).fetchall()
updates: list[tuple[str, str]] = []
for row in rows:
username, password_hash, auth_provider = row
provider = auth_provider or "local"
if provider == "local":
if verify_password("jellyfin-user", password_hash):
provider = "jellyfin"
elif verify_password("jellyseerr-user", password_hash):
provider = "jellyseerr"
if provider != auth_provider:
updates.append((provider, username))
if not updates:
return
with _connect() as conn:
conn.executemany(
"""
UPDATE users SET auth_provider = ? WHERE username = ?
""",
updates,
)
def upsert_user_activity(username: str, ip: str, user_agent: str) -> None:
if not username:
return
ip_value = ip.strip() if isinstance(ip, str) and ip.strip() else "unknown"
agent_value = (
user_agent.strip() if isinstance(user_agent, str) and user_agent.strip() else "unknown"
)
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
INSERT INTO user_activity (username, ip, user_agent, first_seen_at, last_seen_at, hit_count)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(username, ip, user_agent)
DO UPDATE SET last_seen_at = excluded.last_seen_at, hit_count = hit_count + 1
""",
(username, ip_value, agent_value, timestamp, timestamp),
)
def get_user_activity(username: str, limit: int = 5) -> list[Dict[str, Any]]:
limit = max(1, min(limit, 20))
with _connect() as conn:
rows = conn.execute(
"""
SELECT ip, user_agent, first_seen_at, last_seen_at, hit_count
FROM user_activity
WHERE username = ?
ORDER BY last_seen_at DESC
LIMIT ?
""",
(username, limit),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
results.append(
{
"ip": row[0],
"user_agent": row[1],
"first_seen_at": row[2],
"last_seen_at": row[3],
"hit_count": row[4],
}
)
return results
def get_user_activity_summary(username: str) -> Dict[str, Any]:
with _connect() as conn:
last_row = conn.execute(
"""
SELECT ip, user_agent, last_seen_at
FROM user_activity
WHERE username = ?
ORDER BY last_seen_at DESC
LIMIT 1
""",
(username,),
).fetchone()
count_row = conn.execute(
"""
SELECT COUNT(*)
FROM user_activity
WHERE username = ?
""",
(username,),
).fetchone()
return {
"last_ip": last_row[0] if last_row else None,
"last_user_agent": last_row[1] if last_row else None,
"last_seen_at": last_row[2] if last_row else None,
"device_count": int(count_row[0] or 0) if count_row else 0,
}
def get_user_request_stats(username_norm: str, requested_by_id: Optional[int] = None) -> Dict[str, Any]:
if requested_by_id is None:
return {
"total": 0,
"ready": 0,
"pending": 0,
"approved": 0,
"working": 0,
"partial": 0,
"declined": 0,
"in_progress": 0,
"last_request_at": None,
}
with _connect() as conn:
total_row = conn.execute(
"""
SELECT COUNT(*)
FROM requests_cache
WHERE requested_by_id = ?
""",
(requested_by_id,),
).fetchone()
status_rows = conn.execute(
"""
SELECT status, COUNT(*)
FROM requests_cache
WHERE requested_by_id = ?
GROUP BY status
""",
(requested_by_id,),
).fetchall()
last_row = conn.execute(
"""
SELECT MAX(created_at)
FROM requests_cache
WHERE requested_by_id = ?
""",
(requested_by_id,),
).fetchone()
counts = {int(row[0]): int(row[1]) for row in status_rows if row[0] is not None}
pending = counts.get(1, 0)
approved = counts.get(2, 0)
declined = counts.get(3, 0)
ready = counts.get(4, 0)
working = counts.get(5, 0)
partial = counts.get(6, 0)
in_progress = approved + working + partial
return {
"total": int(total_row[0] or 0) if total_row else 0,
"ready": ready,
"pending": pending,
"approved": approved,
"working": working,
"partial": partial,
"declined": declined,
"in_progress": in_progress,
"last_request_at": last_row[0] if last_row else None,
}
def get_global_request_leader() -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT requested_by_norm, MAX(requested_by) as display_name, COUNT(*) as total
FROM requests_cache
WHERE requested_by_norm IS NOT NULL AND requested_by_norm != ''
GROUP BY requested_by_norm
ORDER BY total DESC
LIMIT 1
"""
).fetchone()
if not row:
return None
return {"username": row[1] or row[0], "total": int(row[2] or 0)}
def get_global_request_total() -> int:
with _connect() as conn:
row = conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone()
return int(row[0] or 0)
def upsert_request_cache(
request_id: int,
media_id: Optional[int],
media_type: Optional[str],
status: Optional[int],
title: Optional[str],
year: Optional[int],
requested_by: Optional[str],
requested_by_norm: Optional[str],
requested_by_id: Optional[int],
created_at: Optional[str],
updated_at: Optional[str],
payload_json: str,
) -> None:
normalized_title = _normalize_title_value(title)
normalized_year = _normalize_year_value(year)
derived_title = None
derived_year = None
if not normalized_title or normalized_year is None:
derived_title, derived_year = _extract_title_year_from_payload(payload_json)
if _is_placeholder_title(normalized_title, request_id):
normalized_title = None
if derived_title and not normalized_title:
normalized_title = derived_title
if normalized_year is None and derived_year is not None:
normalized_year = derived_year
with _connect() as conn:
existing_title = None
existing_year = None
if normalized_title is None or normalized_year is None:
row = conn.execute(
"SELECT title, year FROM requests_cache WHERE request_id = ?",
(request_id,),
).fetchone()
if row:
existing_title, existing_year = row[0], row[1]
if _is_placeholder_title(existing_title, request_id):
existing_title = None
if normalized_title is None and existing_title:
normalized_title = existing_title
if normalized_year is None and existing_year is not None:
normalized_year = existing_year
conn.execute(
"""
INSERT INTO requests_cache (
request_id,
media_id,
media_type,
status,
title,
year,
requested_by,
requested_by_norm,
requested_by_id,
created_at,
updated_at,
payload_json
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(request_id) DO UPDATE SET
media_id = excluded.media_id,
media_type = excluded.media_type,
status = excluded.status,
title = excluded.title,
year = excluded.year,
requested_by = excluded.requested_by,
requested_by_norm = excluded.requested_by_norm,
requested_by_id = excluded.requested_by_id,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
payload_json = excluded.payload_json
""",
(
request_id,
media_id,
media_type,
status,
normalized_title,
normalized_year,
requested_by,
requested_by_norm,
requested_by_id,
created_at,
updated_at,
payload_json,
),
)
logger.debug(
"requests_cache upsert: request_id=%s media_id=%s status=%s updated_at=%s",
request_id,
media_id,
status,
updated_at,
)
def get_request_cache_last_updated() -> Optional[str]:
with _connect() as conn:
row = conn.execute(
"""
SELECT MAX(updated_at) FROM requests_cache
"""
).fetchone()
if not row:
return None
return row[0]
def get_request_cache_by_id(request_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT request_id, updated_at, title
FROM requests_cache
WHERE request_id = ?
""",
(request_id,),
).fetchone()
if not row:
logger.debug("requests_cache miss: request_id=%s", request_id)
return None
logger.debug("requests_cache hit: request_id=%s updated_at=%s", row[0], row[1])
return {"request_id": row[0], "updated_at": row[1], "title": row[2]}
def get_request_cache_payload(request_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT payload_json
FROM requests_cache
WHERE request_id = ?
""",
(request_id,),
).fetchone()
if not row or not row[0]:
logger.debug("requests_cache payload miss: request_id=%s", request_id)
return None
try:
payload = json.loads(row[0])
logger.debug("requests_cache payload hit: request_id=%s", request_id)
return payload
except json.JSONDecodeError:
logger.warning("requests_cache payload invalid json: request_id=%s", request_id)
return None
def get_cached_requests(
limit: int,
offset: int,
requested_by_norm: Optional[str] = None,
requested_by_id: Optional[int] = None,
since_iso: Optional[str] = None,
) -> list[Dict[str, Any]]:
query = """
SELECT request_id, media_id, media_type, status, title, year, requested_by,
requested_by_norm, requested_by_id, created_at, payload_json
FROM requests_cache
"""
params: list[Any] = []
conditions = []
if requested_by_id is not None:
conditions.append("requested_by_id = ?")
params.append(requested_by_id)
elif requested_by_norm:
conditions.append("requested_by_norm = ?")
params.append(requested_by_norm)
if since_iso:
conditions.append("created_at >= ?")
params.append(since_iso)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY created_at DESC, request_id DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with _connect() as conn:
rows = conn.execute(query, tuple(params)).fetchall()
logger.debug(
"requests_cache list: count=%s requested_by_norm=%s requested_by_id=%s since_iso=%s",
len(rows),
requested_by_norm,
requested_by_id,
since_iso,
)
results: list[Dict[str, Any]] = []
for row in rows:
title = row[4]
year = row[5]
payload_json = row[10]
if (not title or not year) and payload_json:
derived_title, derived_year = _extract_title_year_from_payload(payload_json)
if not title:
title = derived_title
if not year:
year = derived_year
results.append(
{
"request_id": row[0],
"media_id": row[1],
"media_type": row[2],
"status": row[3],
"title": title,
"year": year,
"requested_by": row[6],
"requested_by_norm": row[7],
"requested_by_id": row[8],
"created_at": row[9],
}
)
return results
def get_cached_requests_count(
requested_by_norm: Optional[str] = None,
requested_by_id: Optional[int] = None,
since_iso: Optional[str] = None,
) -> int:
query = "SELECT COUNT(*) FROM requests_cache"
params: list[Any] = []
conditions = []
if requested_by_id is not None:
conditions.append("requested_by_id = ?")
params.append(requested_by_id)
elif requested_by_norm:
conditions.append("requested_by_norm = ?")
params.append(requested_by_norm)
if since_iso:
conditions.append("created_at >= ?")
params.append(since_iso)
if conditions:
query += " WHERE " + " AND ".join(conditions)
with _connect() as conn:
row = conn.execute(query, tuple(params)).fetchone()
if not row:
return 0
return int(row[0])
def get_request_cache_overview(limit: int = 50) -> list[Dict[str, Any]]:
limit = max(1, min(limit, 200))
with _connect() as conn:
rows = conn.execute(
"""
SELECT request_id, media_id, media_type, status, title, year, requested_by,
requested_by_norm, requested_by_id, created_at, updated_at, payload_json
FROM requests_cache
ORDER BY updated_at DESC, request_id DESC
LIMIT ?
""",
(limit,),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
title = row[4]
if not title and row[11]:
derived_title, _ = _extract_title_year_from_payload(row[11])
title = derived_title or row[4]
results.append(
{
"request_id": row[0],
"media_id": row[1],
"media_type": row[2],
"status": row[3],
"title": title,
"year": row[5],
"requested_by": row[6],
"requested_by_norm": row[7],
"requested_by_id": row[8],
"created_at": row[9],
"updated_at": row[10],
}
)
return results
def get_request_cache_missing_titles(limit: int = 200) -> list[Dict[str, Any]]:
limit = max(1, min(limit, 500))
with _connect() as conn:
rows = conn.execute(
"""
SELECT request_id, payload_json
FROM requests_cache
WHERE title IS NULL OR TRIM(title) = '' OR LOWER(title) = 'untitled'
ORDER BY updated_at DESC, request_id DESC
LIMIT ?
""",
(limit,),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
payload_json = row[1]
tmdb_id, media_type = _extract_tmdb_from_payload(payload_json)
results.append(
{
"request_id": row[0],
"payload_json": payload_json,
"tmdb_id": tmdb_id,
"media_type": media_type,
}
)
return results
def get_request_cache_count() -> int:
with _connect() as conn:
row = conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone()
return int(row[0] or 0)
def upsert_artwork_cache_status(
request_id: int,
tmdb_id: Optional[int],
media_type: Optional[str],
poster_path: Optional[str],
backdrop_path: Optional[str],
has_tmdb: bool,
poster_cached: bool,
backdrop_cached: bool,
) -> None:
updated_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
INSERT INTO artwork_cache_status (
request_id,
tmdb_id,
media_type,
poster_path,
backdrop_path,
has_tmdb,
poster_cached,
backdrop_cached,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(request_id) DO UPDATE SET
tmdb_id = excluded.tmdb_id,
media_type = excluded.media_type,
poster_path = excluded.poster_path,
backdrop_path = excluded.backdrop_path,
has_tmdb = excluded.has_tmdb,
poster_cached = excluded.poster_cached,
backdrop_cached = excluded.backdrop_cached,
updated_at = excluded.updated_at
""",
(
request_id,
tmdb_id,
media_type,
poster_path,
backdrop_path,
1 if has_tmdb else 0,
1 if poster_cached else 0,
1 if backdrop_cached else 0,
updated_at,
),
)
def get_artwork_cache_status_count() -> int:
with _connect() as conn:
row = conn.execute("SELECT COUNT(*) FROM artwork_cache_status").fetchone()
return int(row[0] or 0)
def get_artwork_cache_missing_count() -> int:
with _connect() as conn:
row = conn.execute(
"""
SELECT COUNT(*)
FROM artwork_cache_status
WHERE (
(poster_path IS NULL AND has_tmdb = 1)
OR (poster_path IS NOT NULL AND poster_cached = 0)
OR (backdrop_path IS NULL AND has_tmdb = 1)
OR (backdrop_path IS NOT NULL AND backdrop_cached = 0)
)
"""
).fetchone()
return int(row[0] or 0)
def update_artwork_cache_stats(
cache_bytes: Optional[int] = None,
cache_files: Optional[int] = None,
missing_count: Optional[int] = None,
total_requests: Optional[int] = None,
) -> None:
updated_at = datetime.now(timezone.utc).isoformat()
if cache_bytes is not None:
set_setting("artwork_cache_bytes", str(int(cache_bytes)))
if cache_files is not None:
set_setting("artwork_cache_files", str(int(cache_files)))
if missing_count is not None:
set_setting("artwork_cache_missing", str(int(missing_count)))
if total_requests is not None:
set_setting("artwork_cache_total_requests", str(int(total_requests)))
set_setting("artwork_cache_updated_at", updated_at)
def get_artwork_cache_stats() -> Dict[str, Any]:
def _get_int(key: str) -> int:
value = get_setting(key)
if value is None:
return 0
try:
return int(value)
except (TypeError, ValueError):
return 0
return {
"cache_bytes": _get_int("artwork_cache_bytes"),
"cache_files": _get_int("artwork_cache_files"),
"missing_artwork": _get_int("artwork_cache_missing"),
"total_requests": _get_int("artwork_cache_total_requests"),
"updated_at": get_setting("artwork_cache_updated_at"),
}
def get_request_cache_stats() -> Dict[str, Any]:
return get_artwork_cache_stats()
def update_request_cache_title(
request_id: int, title: str, year: Optional[int] = None
) -> None:
normalized_title = _normalize_title_value(title)
normalized_year = _normalize_year_value(year)
if not normalized_title:
return
with _connect() as conn:
conn.execute(
"""
UPDATE requests_cache
SET title = ?, year = COALESCE(?, year)
WHERE request_id = ?
""",
(normalized_title, normalized_year, request_id),
)
def repair_request_cache_titles() -> int:
updated = 0
with _connect() as conn:
rows = conn.execute(
"""
SELECT request_id, title, year, payload_json
FROM requests_cache
"""
).fetchall()
for row in rows:
request_id, title, year, payload_json = row
if not _is_placeholder_title(title, request_id):
continue
derived_title, derived_year = _extract_title_year_from_payload(payload_json)
if not derived_title:
continue
conn.execute(
"""
UPDATE requests_cache
SET title = ?, year = COALESCE(?, year)
WHERE request_id = ?
""",
(derived_title, derived_year, request_id),
)
updated += 1
return updated
def prune_duplicate_requests_cache() -> int:
with _connect() as conn:
cursor = conn.execute(
"""
DELETE FROM requests_cache
WHERE media_id IS NOT NULL
AND request_id NOT IN (
SELECT MAX(request_id)
FROM requests_cache
WHERE media_id IS NOT NULL
GROUP BY media_id, COALESCE(requested_by_norm, '')
)
"""
)
return cursor.rowcount
def get_request_cache_payloads(limit: int = 200, offset: int = 0) -> list[Dict[str, Any]]:
limit = max(1, min(limit, 1000))
offset = max(0, offset)
with _connect() as conn:
rows = conn.execute(
"""
SELECT request_id, payload_json
FROM requests_cache
ORDER BY request_id ASC
LIMIT ? OFFSET ?
""",
(limit, offset),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
payload = None
if row[1]:
try:
payload = json.loads(row[1])
except json.JSONDecodeError:
payload = None
results.append({"request_id": row[0], "payload": payload})
return results
def get_request_cache_payloads_missing(limit: int = 200, offset: int = 0) -> list[Dict[str, Any]]:
limit = max(1, min(limit, 1000))
offset = max(0, offset)
with _connect() as conn:
rows = conn.execute(
"""
SELECT rc.request_id, rc.payload_json
FROM requests_cache rc
JOIN artwork_cache_status acs
ON rc.request_id = acs.request_id
WHERE (
(acs.poster_path IS NULL AND acs.has_tmdb = 1)
OR (acs.poster_path IS NOT NULL AND acs.poster_cached = 0)
OR (acs.backdrop_path IS NULL AND acs.has_tmdb = 1)
OR (acs.backdrop_path IS NOT NULL AND acs.backdrop_cached = 0)
)
ORDER BY rc.request_id ASC
LIMIT ? OFFSET ?
""",
(limit, offset),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
payload = None
if row[1]:
try:
payload = json.loads(row[1])
except json.JSONDecodeError:
payload = None
results.append({"request_id": row[0], "payload": payload})
return results
def get_cached_requests_since(since_iso: str) -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT request_id, media_id, media_type, status, title, year, requested_by,
requested_by_norm, requested_by_id, created_at
FROM requests_cache
WHERE created_at >= ?
ORDER BY created_at DESC, request_id DESC
""",
(since_iso,),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
results.append(
{
"request_id": row[0],
"media_id": row[1],
"media_type": row[2],
"status": row[3],
"title": row[4],
"year": row[5],
"requested_by": row[6],
"requested_by_norm": row[7],
"requested_by_id": row[8],
"created_at": row[9],
}
)
return results
def get_cached_request_by_media_id(
media_id: int,
requested_by_norm: Optional[str] = None,
requested_by_id: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
query = """
SELECT request_id, status
FROM requests_cache
WHERE media_id = ?
"""
params: list[Any] = [media_id]
if requested_by_id is not None:
query += " AND requested_by_id = ?"
params.append(requested_by_id)
elif requested_by_norm:
query += " AND requested_by_norm = ?"
params.append(requested_by_norm)
query += " ORDER BY created_at DESC, request_id DESC LIMIT 1"
with _connect() as conn:
row = conn.execute(query, tuple(params)).fetchone()
if not row:
return None
return {"request_id": row[0], "status": row[1]}
def get_setting(key: str) -> Optional[str]:
with _connect() as conn:
row = conn.execute(
"""
SELECT value FROM settings WHERE key = ?
""",
(key,),
).fetchone()
if not row:
return None
return row[0]
def set_setting(key: str, value: Optional[str]) -> None:
updated_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
INSERT INTO settings (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at
""",
(key, value, updated_at),
)
def delete_setting(key: str) -> None:
with _connect() as conn:
conn.execute(
"""
DELETE FROM settings WHERE key = ?
""",
(key,),
)
def get_settings_overrides() -> Dict[str, str]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT key, value FROM settings
"""
).fetchall()
overrides: Dict[str, str] = {}
for row in rows:
key = row[0]
value = row[1]
if key:
overrides[key] = value
return overrides
def run_integrity_check() -> str:
with _connect() as conn:
row = conn.execute("PRAGMA integrity_check").fetchone()
if not row:
return "unknown"
return str(row[0])
def vacuum_db() -> None:
with _connect() as conn:
conn.execute("VACUUM")
def clear_requests_cache() -> int:
with _connect() as conn:
cursor = conn.execute("DELETE FROM requests_cache")
return cursor.rowcount
def clear_history() -> Dict[str, int]:
with _connect() as conn:
actions = conn.execute("DELETE FROM actions").rowcount
snapshots = conn.execute("DELETE FROM snapshots").rowcount
return {"actions": actions, "snapshots": snapshots}
def clear_user_objects_nuclear() -> Dict[str, int]:
with _connect() as conn:
# Preserve admin accounts, but remove invite/profile references so profile rows can be deleted safely.
admin_reset = conn.execute(
"""
UPDATE users
SET profile_id = NULL,
invited_by_code = NULL,
invited_at = NULL
WHERE role = 'admin'
"""
).rowcount
users = conn.execute("DELETE FROM users WHERE role != 'admin'").rowcount
invites = conn.execute("DELETE FROM signup_invites").rowcount
profiles = conn.execute("DELETE FROM user_profiles").rowcount
return {
"users": users,
"invites": invites,
"profiles": profiles,
"adminsReset": admin_reset,
}
def cleanup_history(days: int) -> Dict[str, int]:
if days <= 0:
return {"actions": 0, "snapshots": 0}
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
with _connect() as conn:
actions = conn.execute(
"DELETE FROM actions WHERE created_at < ?",
(cutoff,),
).rowcount
snapshots = conn.execute(
"DELETE FROM snapshots WHERE created_at < ?",
(cutoff,),
).rowcount
return {"actions": actions, "snapshots": snapshots}