Files
Magent/backend/app/routers/status.py

135 lines
4.8 KiB
Python

from typing import Any, Dict
import httpx
from fastapi import APIRouter, Depends, HTTPException
from ..auth import get_current_user
from ..runtime import get_runtime_settings
from ..clients.jellyseerr import JellyseerrClient
from ..clients.sonarr import SonarrClient
from ..clients.radarr import RadarrClient
from ..clients.prowlarr import ProwlarrClient
from ..clients.qbittorrent import QBittorrentClient
from ..clients.jellyfin import JellyfinClient
router = APIRouter(prefix="/status", tags=["status"], dependencies=[Depends(get_current_user)])
async def _check(name: str, configured: bool, func) -> Dict[str, Any]:
if not configured:
return {"name": name, "status": "not_configured"}
try:
result = await func()
return {"name": name, "status": "up", "detail": result}
except httpx.HTTPError as exc:
return {"name": name, "status": "down", "message": str(exc)}
except Exception as exc:
return {"name": name, "status": "down", "message": str(exc)}
@router.get("/services")
async def services_status() -> Dict[str, Any]:
runtime = get_runtime_settings()
jellyseerr = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key)
sonarr = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key)
radarr = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key)
prowlarr = ProwlarrClient(runtime.prowlarr_base_url, runtime.prowlarr_api_key)
qbittorrent = QBittorrentClient(
runtime.qbittorrent_base_url, runtime.qbittorrent_username, runtime.qbittorrent_password
)
jellyfin = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key)
services = []
services.append(
await _check(
"Jellyseerr",
jellyseerr.configured(),
lambda: jellyseerr.get_recent_requests(take=1, skip=0),
)
)
services.append(
await _check(
"Sonarr",
sonarr.configured(),
sonarr.get_system_status,
)
)
services.append(
await _check(
"Radarr",
radarr.configured(),
radarr.get_system_status,
)
)
prowlarr_status = await _check(
"Prowlarr",
prowlarr.configured(),
prowlarr.get_health,
)
if prowlarr_status.get("status") == "up":
health = prowlarr_status.get("detail")
if isinstance(health, list) and health:
prowlarr_status["status"] = "degraded"
prowlarr_status["message"] = "Health warnings"
services.append(prowlarr_status)
services.append(
await _check(
"qBittorrent",
qbittorrent.configured(),
qbittorrent.get_app_version,
)
)
services.append(
await _check(
"Jellyfin",
jellyfin.configured(),
jellyfin.get_system_info,
)
)
overall = "up"
if any(s.get("status") == "down" for s in services):
overall = "down"
elif any(s.get("status") in {"degraded", "not_configured"} for s in services):
overall = "degraded"
return {"overall": overall, "services": services}
@router.post("/services/{service}/test")
async def test_service(service: str) -> Dict[str, Any]:
runtime = get_runtime_settings()
jellyseerr = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key)
sonarr = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key)
radarr = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key)
prowlarr = ProwlarrClient(runtime.prowlarr_base_url, runtime.prowlarr_api_key)
qbittorrent = QBittorrentClient(
runtime.qbittorrent_base_url, runtime.qbittorrent_username, runtime.qbittorrent_password
)
jellyfin = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key)
service_key = service.strip().lower()
checks = {
"jellyseerr": (
"Jellyseerr",
jellyseerr.configured(),
lambda: jellyseerr.get_recent_requests(take=1, skip=0),
),
"sonarr": ("Sonarr", sonarr.configured(), sonarr.get_system_status),
"radarr": ("Radarr", radarr.configured(), radarr.get_system_status),
"prowlarr": ("Prowlarr", prowlarr.configured(), prowlarr.get_health),
"qbittorrent": ("qBittorrent", qbittorrent.configured(), qbittorrent.get_app_version),
"jellyfin": ("Jellyfin", jellyfin.configured(), jellyfin.get_system_info),
}
if service_key not in checks:
raise HTTPException(status_code=404, detail="Unknown service")
name, configured, func = checks[service_key]
result = await _check(name, configured, func)
if name == "Prowlarr" and result.get("status") == "up":
health = result.get("detail")
if isinstance(health, list) and health:
result["status"] = "degraded"
result["message"] = "Health warnings"
return result