Build 2602261523: live updates, invite cleanup and nuclear resync
This commit is contained in:
@@ -1,15 +1,18 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import asyncio
|
||||
import ipaddress
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
import sqlite3
|
||||
import string
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File
|
||||
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from ..auth import require_admin, get_current_user
|
||||
from ..auth import require_admin, get_current_user, require_admin_event_stream
|
||||
from ..config import settings as env_settings
|
||||
from ..db import (
|
||||
delete_setting,
|
||||
@@ -37,6 +40,7 @@ from ..db import (
|
||||
vacuum_db,
|
||||
clear_requests_cache,
|
||||
clear_history,
|
||||
clear_user_objects_nuclear,
|
||||
cleanup_history,
|
||||
update_request_cache_title,
|
||||
repair_request_cache_titles,
|
||||
@@ -65,6 +69,7 @@ from ..services.user_cache import (
|
||||
match_jellyseerr_user_id,
|
||||
save_jellyfin_users_cache,
|
||||
save_jellyseerr_users_cache,
|
||||
clear_user_import_caches,
|
||||
)
|
||||
import logging
|
||||
from ..logging_config import configure_logging
|
||||
@@ -72,6 +77,7 @@ from ..routers import requests as requests_router
|
||||
from ..routers.branding import save_branding_image
|
||||
|
||||
router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)])
|
||||
events_router = APIRouter(prefix="/admin/events", tags=["admin"])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SENSITIVE_KEYS = {
|
||||
@@ -130,6 +136,36 @@ SETTING_KEYS: List[str] = [
|
||||
"site_banner_tone",
|
||||
]
|
||||
|
||||
|
||||
def _admin_live_state_snapshot() -> Dict[str, Any]:
|
||||
return {
|
||||
"type": "admin_live_state",
|
||||
"requestsSync": requests_router.get_requests_sync_state(),
|
||||
"artworkPrefetch": requests_router.get_artwork_prefetch_state(),
|
||||
}
|
||||
|
||||
|
||||
def _sse_encode(data: Dict[str, Any]) -> str:
|
||||
payload = json.dumps(data, ensure_ascii=True, separators=(",", ":"), default=str)
|
||||
return f"data: {payload}\n\n"
|
||||
|
||||
|
||||
def _read_log_tail_lines(lines: int) -> List[str]:
|
||||
runtime = get_runtime_settings()
|
||||
log_file = runtime.log_file
|
||||
if not log_file:
|
||||
raise HTTPException(status_code=400, detail="Log file not configured")
|
||||
if not os.path.isabs(log_file):
|
||||
log_file = os.path.join(os.getcwd(), log_file)
|
||||
if not os.path.exists(log_file):
|
||||
raise HTTPException(status_code=404, detail="Log file not found")
|
||||
lines = max(1, min(lines, 1000))
|
||||
from collections import deque
|
||||
|
||||
with open(log_file, "r", encoding="utf-8", errors="replace") as handle:
|
||||
tail = deque(handle, maxlen=lines)
|
||||
return list(tail)
|
||||
|
||||
def _normalize_username(value: str) -> str:
|
||||
normalized = value.strip().lower()
|
||||
if "@" in normalized:
|
||||
@@ -608,22 +644,65 @@ async def requests_sync_status() -> Dict[str, Any]:
|
||||
return {"status": "ok", "sync": requests_router.get_requests_sync_state()}
|
||||
|
||||
|
||||
@events_router.get("/stream")
|
||||
async def admin_events_stream(
|
||||
request: Request,
|
||||
include_logs: bool = False,
|
||||
log_lines: int = 200,
|
||||
_: Dict[str, Any] = Depends(require_admin_event_stream),
|
||||
) -> StreamingResponse:
|
||||
async def event_generator():
|
||||
# Advise client reconnect timing once per stream.
|
||||
yield "retry: 2000\n\n"
|
||||
last_snapshot: Optional[str] = None
|
||||
heartbeat_counter = 0
|
||||
log_refresh_counter = 5 if include_logs else 0
|
||||
latest_logs_payload: Optional[Dict[str, Any]] = None
|
||||
while True:
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
snapshot_payload = _admin_live_state_snapshot()
|
||||
if include_logs:
|
||||
log_refresh_counter += 1
|
||||
if log_refresh_counter >= 5:
|
||||
log_refresh_counter = 0
|
||||
try:
|
||||
latest_logs_payload = {
|
||||
"lines": _read_log_tail_lines(log_lines),
|
||||
"count": max(1, min(int(log_lines or 200), 1000)),
|
||||
}
|
||||
except HTTPException as exc:
|
||||
latest_logs_payload = {
|
||||
"error": str(exc.detail) if exc.detail else "Could not read logs",
|
||||
}
|
||||
except Exception as exc:
|
||||
latest_logs_payload = {"error": str(exc)}
|
||||
snapshot_payload["logs"] = latest_logs_payload
|
||||
|
||||
snapshot = _sse_encode(snapshot_payload)
|
||||
if snapshot != last_snapshot:
|
||||
last_snapshot = snapshot
|
||||
yield snapshot
|
||||
heartbeat_counter = 0
|
||||
else:
|
||||
heartbeat_counter += 1
|
||||
# Keep the stream alive through proxies even when state is unchanged.
|
||||
if heartbeat_counter >= 15:
|
||||
yield ": ping\n\n"
|
||||
heartbeat_counter = 0
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
headers = {
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
}
|
||||
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
|
||||
|
||||
|
||||
@router.get("/logs")
|
||||
async def read_logs(lines: int = 200) -> Dict[str, Any]:
|
||||
runtime = get_runtime_settings()
|
||||
log_file = runtime.log_file
|
||||
if not log_file:
|
||||
raise HTTPException(status_code=400, detail="Log file not configured")
|
||||
if not os.path.isabs(log_file):
|
||||
log_file = os.path.join(os.getcwd(), log_file)
|
||||
if not os.path.exists(log_file):
|
||||
raise HTTPException(status_code=404, detail="Log file not found")
|
||||
lines = max(1, min(lines, 1000))
|
||||
from collections import deque
|
||||
|
||||
with open(log_file, "r", encoding="utf-8", errors="replace") as handle:
|
||||
tail = deque(handle, maxlen=lines)
|
||||
return {"lines": list(tail)}
|
||||
return {"lines": _read_log_tail_lines(lines)}
|
||||
|
||||
|
||||
@router.get("/requests/cache")
|
||||
@@ -689,9 +768,23 @@ async def repair_database() -> Dict[str, Any]:
|
||||
async def flush_database() -> Dict[str, Any]:
|
||||
cleared = clear_requests_cache()
|
||||
history = clear_history()
|
||||
user_objects = clear_user_objects_nuclear()
|
||||
user_caches = clear_user_import_caches()
|
||||
delete_setting("requests_sync_last_at")
|
||||
logger.warning("Database flush executed: requests_cache=%s history=%s", cleared, history)
|
||||
return {"status": "ok", "requestsCleared": cleared, "historyCleared": history}
|
||||
logger.warning(
|
||||
"Database flush executed: requests_cache=%s history=%s user_objects=%s user_caches=%s",
|
||||
cleared,
|
||||
history,
|
||||
user_objects,
|
||||
user_caches,
|
||||
)
|
||||
return {
|
||||
"status": "ok",
|
||||
"requestsCleared": cleared,
|
||||
"historyCleared": history,
|
||||
"userObjectsCleared": user_objects,
|
||||
"userCachesCleared": user_caches,
|
||||
}
|
||||
|
||||
|
||||
@router.post("/maintenance/cleanup")
|
||||
|
||||
Reference in New Issue
Block a user