Files
Magent/backend/app/services/snapshot.py

604 lines
25 KiB
Python

from typing import Any, Dict, List, Optional
import asyncio
import logging
from datetime import datetime, timezone
from urllib.parse import quote
from ..clients.jellyseerr import JellyseerrClient
from ..clients.jellyfin import JellyfinClient
from ..clients.sonarr import SonarrClient
from ..clients.radarr import RadarrClient
from ..clients.prowlarr import ProwlarrClient
from ..clients.qbittorrent import QBittorrentClient
from ..runtime import get_runtime_settings
from ..db import save_snapshot, get_request_cache_payload
from ..models import ActionOption, NormalizedState, RequestType, Snapshot, TimelineHop
STATUS_LABELS = {
1: "Waiting for approval",
2: "Approved",
3: "Declined",
4: "Ready to watch",
5: "Working on it",
6: "Partially ready",
}
def _status_label(value: Any) -> str:
try:
numeric = int(value)
return STATUS_LABELS.get(numeric, f"Status {numeric}")
except (TypeError, ValueError):
return "Unknown"
def _pick_first(value: Any) -> Optional[Dict[str, Any]]:
if isinstance(value, list):
return value[0] if value else None
if isinstance(value, dict):
return value
return None
def _queue_records(queue: Any) -> List[Dict[str, Any]]:
if isinstance(queue, dict):
records = queue.get("records")
if isinstance(records, list):
return records
if isinstance(queue, list):
return queue
return []
def _filter_queue(queue: Any, item_id: Optional[int], request_type: RequestType) -> Any:
if not item_id:
return queue
records = _queue_records(queue)
if not records:
return queue
key = "seriesId" if request_type == RequestType.tv else "movieId"
filtered = [record for record in records if record.get(key) == item_id]
if isinstance(queue, dict):
filtered_queue = dict(queue)
filtered_queue["records"] = filtered
filtered_queue["totalRecords"] = len(filtered)
return filtered_queue
return filtered
def _download_ids(records: List[Dict[str, Any]]) -> List[str]:
ids = []
for record in records:
download_id = record.get("downloadId") or record.get("download_id")
if isinstance(download_id, str) and download_id:
ids.append(download_id)
return ids
def _missing_episode_numbers_by_season(episodes: Any) -> Dict[int, List[int]]:
if not isinstance(episodes, list):
return {}
grouped: Dict[int, List[int]] = {}
now = datetime.now(timezone.utc)
for episode in episodes:
if not isinstance(episode, dict):
continue
if not episode.get("monitored", True):
continue
if episode.get("hasFile"):
continue
air_date = episode.get("airDateUtc")
if isinstance(air_date, str):
try:
aired_at = datetime.fromisoformat(air_date.replace("Z", "+00:00"))
except ValueError:
aired_at = None
if aired_at and aired_at > now:
continue
season_number = episode.get("seasonNumber")
episode_number = episode.get("episodeNumber")
if not isinstance(episode_number, int):
episode_number = episode.get("absoluteEpisodeNumber")
if isinstance(season_number, int) and isinstance(episode_number, int):
grouped.setdefault(season_number, []).append(episode_number)
for season_number in list(grouped.keys()):
grouped[season_number] = sorted(set(grouped[season_number]))
return grouped
def _summarize_qbit(torrents: List[Dict[str, Any]]) -> Dict[str, Any]:
if not torrents:
return {"state": "idle", "message": "0 active downloads."}
downloading_states = {"downloading", "stalleddl", "queueddl", "checkingdl", "forceddl"}
paused_states = {"pauseddl", "pausedup"}
completed_states = {"uploading", "stalledup", "queuedup", "checkingup", "forcedup", "stoppedup"}
downloading = [t for t in torrents if str(t.get("state", "")).lower() in downloading_states]
paused = [t for t in torrents if str(t.get("state", "")).lower() in paused_states]
completed = [t for t in torrents if str(t.get("state", "")).lower() in completed_states]
if downloading:
return {
"state": "downloading",
"message": f"Downloading ({len(downloading)} active).",
}
if paused:
return {
"state": "paused",
"message": f"Paused ({len(paused)} paused).",
}
if completed:
return {
"state": "completed",
"message": f"Completed/seeding ({len(completed)} seeding).",
}
return {
"state": "idle",
"message": "0 active downloads.",
}
def _artwork_url(path: Optional[str], size: str, cache_mode: str) -> Optional[str]:
if not path:
return None
if not path.startswith("/"):
path = f"/{path}"
if cache_mode == "cache":
return f"/images/tmdb?path={quote(path)}&size={size}"
return f"https://image.tmdb.org/t/p/{size}{path}"
async def build_snapshot(request_id: str) -> Snapshot:
timeline = []
runtime = get_runtime_settings()
jellyseerr = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key)
jellyfin = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key)
sonarr = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key)
radarr = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key)
prowlarr = ProwlarrClient(runtime.prowlarr_base_url, runtime.prowlarr_api_key)
qbittorrent = QBittorrentClient(
runtime.qbittorrent_base_url,
runtime.qbittorrent_username,
runtime.qbittorrent_password,
)
snapshot = Snapshot(
request_id=request_id,
title="Unknown",
state=NormalizedState.unknown,
state_reason="Awaiting configuration",
)
cached_request = None
mode = (runtime.requests_data_source or "prefer_cache").lower()
if mode != "always_js" and request_id.isdigit():
cached_request = get_request_cache_payload(int(request_id))
if cached_request is not None:
logging.getLogger(__name__).debug(
"snapshot cache hit: request_id=%s mode=%s", request_id, mode
)
else:
logging.getLogger(__name__).debug(
"snapshot cache miss: request_id=%s mode=%s", request_id, mode
)
if not jellyseerr.configured() and not cached_request:
timeline.append(TimelineHop(service="Jellyseerr", status="not_configured"))
timeline.append(TimelineHop(service="Sonarr/Radarr", status="not_configured"))
timeline.append(TimelineHop(service="Prowlarr", status="not_configured"))
timeline.append(TimelineHop(service="qBittorrent", status="not_configured"))
snapshot.timeline = timeline
return snapshot
jelly_request = cached_request
if (jelly_request is None or mode == "always_js") and jellyseerr.configured():
try:
jelly_request = await jellyseerr.get_request(request_id)
logging.getLogger(__name__).debug(
"snapshot jellyseerr fetch: request_id=%s mode=%s", request_id, mode
)
except Exception as exc:
timeline.append(TimelineHop(service="Jellyseerr", status="error", details={"error": str(exc)}))
snapshot.timeline = timeline
snapshot.state = NormalizedState.failed
snapshot.state_reason = "Failed to reach Jellyseerr"
return snapshot
if not jelly_request:
timeline.append(TimelineHop(service="Jellyseerr", status="not_found"))
snapshot.timeline = timeline
snapshot.state = NormalizedState.unknown
snapshot.state_reason = "Request not found in Jellyseerr"
return snapshot
jelly_status = jelly_request.get("status", "unknown")
jelly_status_label = _status_label(jelly_status)
jelly_type = jelly_request.get("type") or "unknown"
snapshot.title = jelly_request.get("media", {}).get("title", "Unknown")
snapshot.year = jelly_request.get("media", {}).get("year")
snapshot.request_type = RequestType(jelly_type) if jelly_type in {"movie", "tv"} else RequestType.unknown
media = jelly_request.get("media", {}) if isinstance(jelly_request, dict) else {}
poster_path = None
backdrop_path = None
if isinstance(media, dict):
poster_path = media.get("posterPath") or media.get("poster_path")
backdrop_path = media.get("backdropPath") or media.get("backdrop_path")
if snapshot.title in {None, "", "Unknown"} and jellyseerr.configured():
tmdb_id = jelly_request.get("media", {}).get("tmdbId")
if tmdb_id:
try:
if snapshot.request_type == RequestType.movie:
details = await jellyseerr.get_movie(int(tmdb_id))
if isinstance(details, dict):
snapshot.title = details.get("title") or snapshot.title
release_date = details.get("releaseDate")
snapshot.year = int(release_date[:4]) if release_date else snapshot.year
poster_path = poster_path or details.get("posterPath") or details.get("poster_path")
backdrop_path = (
backdrop_path
or details.get("backdropPath")
or details.get("backdrop_path")
)
elif snapshot.request_type == RequestType.tv:
details = await jellyseerr.get_tv(int(tmdb_id))
if isinstance(details, dict):
snapshot.title = details.get("name") or details.get("title") or snapshot.title
first_air = details.get("firstAirDate")
snapshot.year = int(first_air[:4]) if first_air else snapshot.year
poster_path = poster_path or details.get("posterPath") or details.get("poster_path")
backdrop_path = (
backdrop_path
or details.get("backdropPath")
or details.get("backdrop_path")
)
except Exception:
pass
cache_mode = (runtime.artwork_cache_mode or "remote").lower()
snapshot.artwork = {
"poster_path": poster_path,
"backdrop_path": backdrop_path,
"poster_url": _artwork_url(poster_path, "w342", cache_mode),
"backdrop_url": _artwork_url(backdrop_path, "w780", cache_mode),
}
timeline.append(
TimelineHop(
service="Jellyseerr",
status=jelly_status_label,
details={
"requestedBy": jelly_request.get("requestedBy", {}).get("displayName")
or jelly_request.get("requestedBy", {}).get("username")
or jelly_request.get("requestedBy", {}).get("jellyfinUsername")
or jelly_request.get("requestedBy", {}).get("email"),
"createdAt": jelly_request.get("createdAt"),
"updatedAt": jelly_request.get("updatedAt"),
"approved": jelly_request.get("isApproved"),
"statusCode": jelly_status,
},
)
)
arr_state = None
arr_details: Dict[str, Any] = {}
arr_item = None
arr_queue = None
media_status = jelly_request.get("media", {}).get("status")
try:
media_status_code = int(media_status) if media_status is not None else None
except (TypeError, ValueError):
media_status_code = None
if snapshot.request_type == RequestType.tv:
tvdb_id = jelly_request.get("media", {}).get("tvdbId")
if tvdb_id:
try:
series = await sonarr.get_series_by_tvdb_id(int(tvdb_id))
arr_item = _pick_first(series)
arr_details["series"] = arr_item
arr_state = "added" if arr_item else "missing"
if arr_item:
stats = arr_item.get("statistics") if isinstance(arr_item, dict) else None
if isinstance(stats, dict):
file_count = stats.get("episodeFileCount")
total_count = (
stats.get("totalEpisodeCount")
if isinstance(stats.get("totalEpisodeCount"), int)
else stats.get("episodeCount")
)
if (
isinstance(file_count, int)
and isinstance(total_count, int)
and total_count > 0
and file_count >= total_count
):
arr_state = "available"
if arr_item and isinstance(arr_item.get("id"), int):
series_id = int(arr_item["id"])
arr_queue = await sonarr.get_queue(series_id)
arr_queue = _filter_queue(arr_queue, series_id, RequestType.tv)
arr_details["queue"] = arr_queue
episodes = await sonarr.get_episodes(series_id)
missing_by_season = _missing_episode_numbers_by_season(episodes)
if missing_by_season:
arr_details["missingEpisodes"] = missing_by_season
except Exception as exc:
arr_state = "error"
arr_details["error"] = str(exc)
elif snapshot.request_type == RequestType.movie:
tmdb_id = jelly_request.get("media", {}).get("tmdbId")
if tmdb_id:
try:
movie = await radarr.get_movie_by_tmdb_id(int(tmdb_id))
arr_item = _pick_first(movie)
if not arr_item:
title_hint = (
jelly_request.get("media", {}).get("title")
or jelly_request.get("title")
or snapshot.title
)
year_hint = (
jelly_request.get("media", {}).get("year")
or jelly_request.get("year")
or snapshot.year
)
try:
all_movies = await radarr.get_movies()
except Exception:
all_movies = None
if isinstance(all_movies, list):
for candidate in all_movies:
if not isinstance(candidate, dict):
continue
if tmdb_id and candidate.get("tmdbId") == int(tmdb_id):
arr_item = candidate
break
if title_hint and candidate.get("title") == title_hint:
if not year_hint or candidate.get("year") == year_hint:
arr_item = candidate
break
arr_details["movie"] = arr_item
if arr_item:
if arr_item.get("hasFile"):
arr_state = "available"
elif arr_item.get("isAvailable"):
arr_state = "searching"
else:
arr_state = "added"
else:
arr_state = "missing"
if arr_item and isinstance(arr_item.get("id"), int):
arr_queue = await radarr.get_queue(int(arr_item["id"]))
arr_queue = _filter_queue(arr_queue, int(arr_item["id"]), RequestType.movie)
arr_details["queue"] = arr_queue
except Exception as exc:
arr_state = "error"
arr_details["error"] = str(exc)
if arr_state is None:
arr_state = "unknown"
if arr_state == "missing" and media_status_code in {4}:
arr_state = "available"
elif arr_state == "missing" and media_status_code in {6}:
arr_state = "added"
timeline.append(TimelineHop(service="Sonarr/Radarr", status=arr_state, details=arr_details))
try:
prowlarr_health = await prowlarr.get_health()
if isinstance(prowlarr_health, list) and len(prowlarr_health) > 0:
timeline.append(TimelineHop(service="Prowlarr", status="issues", details={"health": prowlarr_health}))
else:
timeline.append(TimelineHop(service="Prowlarr", status="ok"))
except Exception as exc:
timeline.append(TimelineHop(service="Prowlarr", status="error", details={"error": str(exc)}))
jellyfin_available = False
jellyfin_item = None
if jellyfin.configured() and snapshot.title:
types = ["Movie"] if snapshot.request_type == RequestType.movie else ["Series"]
try:
search = await jellyfin.search_items(snapshot.title, types)
except Exception:
search = None
if isinstance(search, dict):
items = search.get("Items") or search.get("items") or []
for item in items:
if not isinstance(item, dict):
continue
name = item.get("Name") or item.get("title")
year = item.get("ProductionYear") or item.get("Year")
if name and name.strip().lower() == (snapshot.title or "").strip().lower():
if snapshot.year and year and int(year) != int(snapshot.year):
continue
jellyfin_available = True
jellyfin_item = item
break
if jellyfin_available and arr_state == "missing" and runtime.jellyfin_sync_to_arr:
arr_details["note"] = "Found in Jellyfin but not tracked in Sonarr/Radarr."
if snapshot.request_type == RequestType.movie:
if runtime.radarr_quality_profile_id and runtime.radarr_root_folder:
radarr_client = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key)
if radarr_client.configured():
root_folder = await _resolve_root_folder_path(
radarr_client, runtime.radarr_root_folder, "Radarr"
)
tmdb_id = jelly_request.get("media", {}).get("tmdbId")
if tmdb_id:
try:
await radarr_client.add_movie(
int(tmdb_id),
runtime.radarr_quality_profile_id,
root_folder,
monitored=False,
search_for_movie=False,
)
except Exception:
pass
if snapshot.request_type == RequestType.tv:
if runtime.sonarr_quality_profile_id and runtime.sonarr_root_folder:
sonarr_client = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key)
if sonarr_client.configured():
root_folder = await _resolve_root_folder_path(
sonarr_client, runtime.sonarr_root_folder, "Sonarr"
)
tvdb_id = jelly_request.get("media", {}).get("tvdbId")
if tvdb_id:
try:
await sonarr_client.add_series(
int(tvdb_id),
runtime.sonarr_quality_profile_id,
root_folder,
monitored=False,
search_missing=False,
)
except Exception:
pass
qbit_state = None
qbit_message = None
try:
download_ids = _download_ids(_queue_records(arr_queue))
torrent_list: List[Dict[str, Any]] = []
if download_ids and qbittorrent.configured():
torrents = await qbittorrent.get_torrents_by_hashes("|".join(download_ids))
torrent_list = torrents if isinstance(torrents, list) else []
summary = _summarize_qbit(torrent_list)
qbit_state = summary.get("state")
qbit_message = summary.get("message")
timeline.append(
TimelineHop(
service="qBittorrent",
status=summary["state"],
details={
"summary": summary["message"],
"torrents": torrent_list,
},
)
)
except Exception as exc:
timeline.append(TimelineHop(service="qBittorrent", status="error", details={"error": str(exc)}))
status_code = None
try:
status_code = int(jelly_status)
except (TypeError, ValueError):
status_code = None
derived_approved = bool(jelly_request.get("isApproved")) or status_code in {2, 4, 5, 6}
if derived_approved:
snapshot.state = NormalizedState.approved
snapshot.state_reason = "Approved and queued for processing."
else:
snapshot.state = NormalizedState.requested
snapshot.state_reason = "Waiting for approval before we can search."
queue_records = _queue_records(arr_queue)
if qbit_state in {"downloading", "paused"}:
snapshot.state = NormalizedState.downloading
snapshot.state_reason = "Downloading in qBittorrent."
if qbit_message:
snapshot.state_reason = qbit_message
elif qbit_state == "completed":
if arr_state == "available":
snapshot.state = NormalizedState.completed
snapshot.state_reason = "In your library and ready to watch."
else:
snapshot.state = NormalizedState.importing
snapshot.state_reason = "Download finished. Waiting for library import."
elif queue_records:
if arr_state == "missing":
snapshot.state_reason = "Queue shows a download, but qBittorrent has no active torrent."
else:
snapshot.state_reason = "Waiting for download to start in qBittorrent."
elif arr_state == "missing" and derived_approved:
snapshot.state = NormalizedState.needs_add
snapshot.state_reason = "Approved, but not added to the library yet."
elif arr_state == "searching":
snapshot.state = NormalizedState.searching
snapshot.state_reason = "Searching for a matching release."
elif arr_state == "available":
snapshot.state = NormalizedState.completed
snapshot.state_reason = "In your library and ready to watch."
elif arr_state == "added" and snapshot.state == NormalizedState.approved:
snapshot.state = NormalizedState.added_to_arr
snapshot.state_reason = "Item is present in Sonarr/Radarr"
if jellyfin_available and snapshot.state not in {
NormalizedState.downloading,
NormalizedState.importing,
}:
snapshot.state = NormalizedState.completed
snapshot.state_reason = "Ready to watch in Jellyfin."
snapshot.timeline = timeline
actions: List[ActionOption] = []
if arr_state == "missing":
actions.append(
ActionOption(
id="readd_to_arr",
label="Add to the library queue (Sonarr/Radarr)",
risk="medium",
)
)
elif arr_item and arr_state != "available":
actions.append(
ActionOption(
id="search_auto",
label="Search and auto-download",
risk="low",
)
)
actions.append(
ActionOption(
id="search_releases",
label="Search and choose a download",
risk="low",
)
)
download_ids = _download_ids(_queue_records(arr_queue))
if download_ids and qbittorrent.configured():
actions.append(
ActionOption(
id="resume_torrent",
label="Resume the download",
risk="low",
)
)
snapshot.actions = actions
jellyfin_link = None
if runtime.jellyfin_public_url and snapshot.state in {
NormalizedState.available,
NormalizedState.completed,
}:
base_url = runtime.jellyfin_public_url.rstrip("/")
query = quote(snapshot.title or "")
jellyfin_link = f"{base_url}/web/index.html#!/search?query={query}"
snapshot.raw = {
"jellyseerr": jelly_request,
"arr": {
"item": arr_item,
"queue": arr_queue,
},
"jellyfin": {
"publicUrl": runtime.jellyfin_public_url,
"available": snapshot.state in {
NormalizedState.available,
NormalizedState.completed,
},
"link": jellyfin_link,
"item": jellyfin_item,
},
}
await asyncio.to_thread(save_snapshot, snapshot)
return snapshot