Improve SQLite batching and diagnostics visibility

This commit is contained in:
2026-03-03 15:03:23 +13:00
parent e582ff4ef7
commit dda17a20a5
10 changed files with 667 additions and 188 deletions

View File

@@ -4,6 +4,7 @@ import sqlite3
import logging
from hashlib import sha256
from datetime import datetime, timezone, timedelta
from time import perf_counter
from typing import Any, Dict, Optional
from .config import settings
@@ -16,6 +17,9 @@ SEERR_MEDIA_FAILURE_SHORT_SUPPRESS_HOURS = 6
SEERR_MEDIA_FAILURE_RETRY_SUPPRESS_HOURS = 24
SEERR_MEDIA_FAILURE_PERSISTENT_SUPPRESS_DAYS = 30
SEERR_MEDIA_FAILURE_PERSISTENT_THRESHOLD = 3
SQLITE_BUSY_TIMEOUT_MS = 5_000
SQLITE_CACHE_SIZE_KIB = 32_768
SQLITE_MMAP_SIZE_BYTES = 256 * 1024 * 1024
def _db_path() -> str:
@@ -26,8 +30,30 @@ def _db_path() -> str:
return path
def _apply_connection_pragmas(conn: sqlite3.Connection) -> None:
pragmas = (
("journal_mode", "WAL"),
("synchronous", "NORMAL"),
("temp_store", "MEMORY"),
("cache_size", -SQLITE_CACHE_SIZE_KIB),
("mmap_size", SQLITE_MMAP_SIZE_BYTES),
("busy_timeout", SQLITE_BUSY_TIMEOUT_MS),
)
for pragma, value in pragmas:
try:
conn.execute(f"PRAGMA {pragma} = {value}")
except sqlite3.DatabaseError:
logger.debug("sqlite pragma skipped: %s=%s", pragma, value, exc_info=True)
def _connect() -> sqlite3.Connection:
return sqlite3.connect(_db_path())
conn = sqlite3.connect(
_db_path(),
timeout=SQLITE_BUSY_TIMEOUT_MS / 1000,
cached_statements=512,
)
_apply_connection_pragmas(conn)
return conn
def _parse_datetime_value(value: Optional[str]) -> Optional[datetime]:
@@ -321,6 +347,30 @@ def init_db() -> None:
ON requests_cache (requested_by_norm)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_cache_updated_at
ON requests_cache (updated_at DESC, request_id DESC)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_id_created_at
ON requests_cache (requested_by_id, created_at DESC, request_id DESC)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_cache_requested_by_norm_created_at
ON requests_cache (requested_by_norm, created_at DESC, request_id DESC)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_requests_cache_status_created_at
ON requests_cache (status, created_at DESC, request_id DESC)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_artwork_cache_status_updated_at
@@ -441,6 +491,15 @@ def init_db() -> None:
)
except sqlite3.OperationalError:
pass
try:
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_username_nocase
ON users (username COLLATE NOCASE)
"""
)
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE requests_cache ADD COLUMN requested_by_id INTEGER")
except sqlite3.OperationalError:
@@ -454,6 +513,10 @@ def init_db() -> None:
)
except sqlite3.OperationalError:
pass
try:
conn.execute("PRAGMA optimize")
except sqlite3.OperationalError:
pass
_backfill_auth_providers()
ensure_admin_user()
@@ -1619,41 +1682,44 @@ def get_user_request_stats(username_norm: str, requested_by_id: Optional[int] =
"last_request_at": None,
}
with _connect() as conn:
total_row = conn.execute(
row = conn.execute(
"""
SELECT COUNT(*)
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status = 4 THEN 1 ELSE 0 END) AS ready,
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS pending,
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) AS approved,
SUM(CASE WHEN status = 5 THEN 1 ELSE 0 END) AS working,
SUM(CASE WHEN status = 6 THEN 1 ELSE 0 END) AS partial,
SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) AS declined,
MAX(created_at) AS last_request_at
FROM requests_cache
WHERE requested_by_id = ?
""",
(requested_by_id,),
).fetchone()
status_rows = conn.execute(
"""
SELECT status, COUNT(*)
FROM requests_cache
WHERE requested_by_id = ?
GROUP BY status
""",
(requested_by_id,),
).fetchall()
last_row = conn.execute(
"""
SELECT MAX(created_at)
FROM requests_cache
WHERE requested_by_id = ?
""",
(requested_by_id,),
).fetchone()
counts = {int(row[0]): int(row[1]) for row in status_rows if row[0] is not None}
pending = counts.get(1, 0)
approved = counts.get(2, 0)
declined = counts.get(3, 0)
ready = counts.get(4, 0)
working = counts.get(5, 0)
partial = counts.get(6, 0)
if not row:
return {
"total": 0,
"ready": 0,
"pending": 0,
"approved": 0,
"working": 0,
"partial": 0,
"declined": 0,
"in_progress": 0,
"last_request_at": None,
}
total = int(row[0] or 0)
ready = int(row[1] or 0)
pending = int(row[2] or 0)
approved = int(row[3] or 0)
working = int(row[4] or 0)
partial = int(row[5] or 0)
declined = int(row[6] or 0)
in_progress = approved + working + partial
return {
"total": int(total_row[0] or 0) if total_row else 0,
"total": total,
"ready": ready,
"pending": pending,
"approved": approved,
@@ -1661,7 +1727,7 @@ def get_user_request_stats(username_norm: str, requested_by_id: Optional[int] =
"partial": partial,
"declined": declined,
"in_progress": in_progress,
"last_request_at": last_row[0] if last_row else None,
"last_request_at": row[7],
}
@@ -1688,6 +1754,143 @@ def get_global_request_total() -> int:
return int(row[0] or 0)
_REQUESTS_CACHE_UPSERT_SQL = """
INSERT INTO requests_cache (
request_id,
media_id,
media_type,
status,
title,
year,
requested_by,
requested_by_norm,
requested_by_id,
created_at,
updated_at,
payload_json
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(request_id) DO UPDATE SET
media_id = excluded.media_id,
media_type = excluded.media_type,
status = excluded.status,
title = excluded.title,
year = excluded.year,
requested_by = excluded.requested_by,
requested_by_norm = excluded.requested_by_norm,
requested_by_id = excluded.requested_by_id,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
payload_json = excluded.payload_json
"""
def get_request_cache_lookup(request_ids: list[int]) -> Dict[int, Dict[str, Any]]:
normalized_ids = sorted({int(request_id) for request_id in request_ids if isinstance(request_id, int)})
if not normalized_ids:
return {}
placeholders = ", ".join("?" for _ in normalized_ids)
query = f"""
SELECT request_id, updated_at, title, year
FROM requests_cache
WHERE request_id IN ({placeholders})
"""
with _connect() as conn:
rows = conn.execute(query, tuple(normalized_ids)).fetchall()
return {
int(row[0]): {
"request_id": int(row[0]),
"updated_at": row[1],
"title": row[2],
"year": row[3],
}
for row in rows
}
def _prepare_requests_cache_upsert_rows(
records: list[Dict[str, Any]], conn: sqlite3.Connection
) -> list[tuple[Any, ...]]:
if not records:
return []
existing_rows: Dict[int, tuple[Optional[str], Optional[int]]] = {}
ids_needing_existing = [
int(record["request_id"])
for record in records
if isinstance(record.get("request_id"), int)
and (
not _normalize_title_value(record.get("title"))
or _normalize_year_value(record.get("year")) is None
)
]
if ids_needing_existing:
placeholders = ", ".join("?" for _ in sorted(set(ids_needing_existing)))
query = f"""
SELECT request_id, title, year
FROM requests_cache
WHERE request_id IN ({placeholders})
"""
for row in conn.execute(query, tuple(sorted(set(ids_needing_existing)))).fetchall():
existing_rows[int(row[0])] = (row[1], row[2])
prepared: list[tuple[Any, ...]] = []
for record in records:
request_id = int(record["request_id"])
media_id = record.get("media_id")
media_type = record.get("media_type")
status = record.get("status")
requested_by = record.get("requested_by")
requested_by_norm = record.get("requested_by_norm")
requested_by_id = record.get("requested_by_id")
created_at = record.get("created_at")
updated_at = record.get("updated_at")
payload_json = str(record.get("payload_json") or "")
normalized_title = _normalize_title_value(record.get("title"))
normalized_year = _normalize_year_value(record.get("year"))
derived_title = None
derived_year = None
if not normalized_title or normalized_year is None:
derived_title, derived_year = _extract_title_year_from_payload(payload_json)
if _is_placeholder_title(normalized_title, request_id):
normalized_title = None
if derived_title and not normalized_title:
normalized_title = derived_title
if normalized_year is None and derived_year is not None:
normalized_year = derived_year
existing_title = None
existing_year = None
if normalized_title is None or normalized_year is None:
existing = existing_rows.get(request_id)
if existing:
existing_title, existing_year = existing
if _is_placeholder_title(existing_title, request_id):
existing_title = None
if normalized_title is None and existing_title:
normalized_title = existing_title
if normalized_year is None and existing_year is not None:
normalized_year = existing_year
prepared.append(
(
request_id,
media_id,
media_type,
status,
normalized_title,
normalized_year,
requested_by,
requested_by_norm,
requested_by_id,
created_at,
updated_at,
payload_json,
)
)
return prepared
def upsert_request_cache(
request_id: int,
media_id: Optional[int],
@@ -1702,79 +1905,28 @@ def upsert_request_cache(
updated_at: Optional[str],
payload_json: str,
) -> None:
normalized_title = _normalize_title_value(title)
normalized_year = _normalize_year_value(year)
derived_title = None
derived_year = None
if not normalized_title or normalized_year is None:
derived_title, derived_year = _extract_title_year_from_payload(payload_json)
if _is_placeholder_title(normalized_title, request_id):
normalized_title = None
if derived_title and not normalized_title:
normalized_title = derived_title
if normalized_year is None and derived_year is not None:
normalized_year = derived_year
with _connect() as conn:
existing_title = None
existing_year = None
if normalized_title is None or normalized_year is None:
row = conn.execute(
"SELECT title, year FROM requests_cache WHERE request_id = ?",
(request_id,),
).fetchone()
if row:
existing_title, existing_year = row[0], row[1]
if _is_placeholder_title(existing_title, request_id):
existing_title = None
if normalized_title is None and existing_title:
normalized_title = existing_title
if normalized_year is None and existing_year is not None:
normalized_year = existing_year
conn.execute(
"""
INSERT INTO requests_cache (
request_id,
media_id,
media_type,
status,
title,
year,
requested_by,
requested_by_norm,
requested_by_id,
created_at,
updated_at,
payload_json
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(request_id) DO UPDATE SET
media_id = excluded.media_id,
media_type = excluded.media_type,
status = excluded.status,
title = excluded.title,
year = excluded.year,
requested_by = excluded.requested_by,
requested_by_norm = excluded.requested_by_norm,
requested_by_id = excluded.requested_by_id,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
payload_json = excluded.payload_json
""",
(
request_id,
media_id,
media_type,
status,
normalized_title,
normalized_year,
requested_by,
requested_by_norm,
requested_by_id,
created_at,
updated_at,
payload_json,
),
rows = _prepare_requests_cache_upsert_rows(
[
{
"request_id": request_id,
"media_id": media_id,
"media_type": media_type,
"status": status,
"title": title,
"year": year,
"requested_by": requested_by,
"requested_by_norm": requested_by_norm,
"requested_by_id": requested_by_id,
"created_at": created_at,
"updated_at": updated_at,
"payload_json": payload_json,
}
],
conn,
)
if rows:
conn.execute(_REQUESTS_CACHE_UPSERT_SQL, rows[0])
logger.debug(
"requests_cache upsert: request_id=%s media_id=%s status=%s updated_at=%s",
request_id,
@@ -1784,6 +1936,17 @@ def upsert_request_cache(
)
def upsert_request_cache_many(records: list[Dict[str, Any]]) -> int:
if not records:
return 0
with _connect() as conn:
rows = _prepare_requests_cache_upsert_rows(records, conn)
if rows:
conn.executemany(_REQUESTS_CACHE_UPSERT_SQL, rows)
logger.debug("requests_cache bulk upsert: rows=%s", len(records))
return len(records)
def get_request_cache_last_updated() -> Optional[str]:
with _connect() as conn:
row = conn.execute(
@@ -2017,9 +2180,45 @@ def upsert_artwork_cache_status(
poster_cached: bool,
backdrop_cached: bool,
) -> None:
upsert_artwork_cache_status_many(
[
{
"request_id": request_id,
"tmdb_id": tmdb_id,
"media_type": media_type,
"poster_path": poster_path,
"backdrop_path": backdrop_path,
"has_tmdb": has_tmdb,
"poster_cached": poster_cached,
"backdrop_cached": backdrop_cached,
}
]
)
def upsert_artwork_cache_status_many(records: list[Dict[str, Any]]) -> int:
if not records:
return 0
updated_at = datetime.now(timezone.utc).isoformat()
params = [
(
record["request_id"],
record.get("tmdb_id"),
record.get("media_type"),
record.get("poster_path"),
record.get("backdrop_path"),
1 if record.get("has_tmdb") else 0,
1 if record.get("poster_cached") else 0,
1 if record.get("backdrop_cached") else 0,
updated_at,
)
for record in records
if isinstance(record.get("request_id"), int)
]
if not params:
return 0
with _connect() as conn:
conn.execute(
conn.executemany(
"""
INSERT INTO artwork_cache_status (
request_id,
@@ -2043,18 +2242,9 @@ def upsert_artwork_cache_status(
backdrop_cached = excluded.backdrop_cached,
updated_at = excluded.updated_at
""",
(
request_id,
tmdb_id,
media_type,
poster_path,
backdrop_path,
1 if has_tmdb else 0,
1 if poster_cached else 0,
1 if backdrop_cached else 0,
updated_at,
),
params,
)
return len(params)
def get_artwork_cache_status_count() -> int:
@@ -2638,6 +2828,73 @@ def run_integrity_check() -> str:
return str(row[0])
def get_database_diagnostics() -> Dict[str, Any]:
db_path = _db_path()
wal_path = f"{db_path}-wal"
shm_path = f"{db_path}-shm"
def _size(path: str) -> int:
try:
return os.path.getsize(path)
except OSError:
return 0
started = perf_counter()
with _connect() as conn:
integrity_started = perf_counter()
integrity_row = conn.execute("PRAGMA integrity_check").fetchone()
integrity_ms = round((perf_counter() - integrity_started) * 1000, 1)
integrity = str(integrity_row[0]) if integrity_row else "unknown"
pragma_started = perf_counter()
page_size_row = conn.execute("PRAGMA page_size").fetchone()
page_count_row = conn.execute("PRAGMA page_count").fetchone()
freelist_row = conn.execute("PRAGMA freelist_count").fetchone()
pragma_ms = round((perf_counter() - pragma_started) * 1000, 1)
row_count_started = perf_counter()
table_counts = {
"users": int(conn.execute("SELECT COUNT(*) FROM users").fetchone()[0] or 0),
"requests_cache": int(conn.execute("SELECT COUNT(*) FROM requests_cache").fetchone()[0] or 0),
"artwork_cache_status": int(conn.execute("SELECT COUNT(*) FROM artwork_cache_status").fetchone()[0] or 0),
"signup_invites": int(conn.execute("SELECT COUNT(*) FROM signup_invites").fetchone()[0] or 0),
"settings": int(conn.execute("SELECT COUNT(*) FROM settings").fetchone()[0] or 0),
"actions": int(conn.execute("SELECT COUNT(*) FROM actions").fetchone()[0] or 0),
"snapshots": int(conn.execute("SELECT COUNT(*) FROM snapshots").fetchone()[0] or 0),
"seerr_media_failures": int(conn.execute("SELECT COUNT(*) FROM seerr_media_failures").fetchone()[0] or 0),
"password_reset_tokens": int(conn.execute("SELECT COUNT(*) FROM password_reset_tokens").fetchone()[0] or 0),
}
row_count_ms = round((perf_counter() - row_count_started) * 1000, 1)
page_size = int(page_size_row[0] or 0) if page_size_row else 0
page_count = int(page_count_row[0] or 0) if page_count_row else 0
freelist_pages = int(freelist_row[0] or 0) if freelist_row else 0
db_size_bytes = _size(db_path)
wal_size_bytes = _size(wal_path)
shm_size_bytes = _size(shm_path)
return {
"integrity_check": integrity,
"database_path": db_path,
"database_size_bytes": db_size_bytes,
"wal_size_bytes": wal_size_bytes,
"shm_size_bytes": shm_size_bytes,
"page_size_bytes": page_size,
"page_count": page_count,
"freelist_pages": freelist_pages,
"allocated_bytes": page_size * page_count,
"free_bytes": page_size * freelist_pages,
"row_counts": table_counts,
"timings_ms": {
"integrity_check": integrity_ms,
"pragmas": pragma_ms,
"row_counts": row_count_ms,
"total": round((perf_counter() - started) * 1000, 1),
},
}
def vacuum_db() -> None:
with _connect() as conn:
conn.execute("VACUUM")