697 lines
27 KiB
Python
697 lines
27 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from time import perf_counter
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
|
|
from ..clients.jellyfin import JellyfinClient
|
|
from ..clients.jellyseerr import JellyseerrClient
|
|
from ..clients.prowlarr import ProwlarrClient
|
|
from ..clients.qbittorrent import QBittorrentClient
|
|
from ..clients.radarr import RadarrClient
|
|
from ..clients.sonarr import SonarrClient
|
|
from ..config import settings as env_settings
|
|
from ..db import run_integrity_check
|
|
from ..runtime import get_runtime_settings
|
|
from .invite_email import send_test_email, smtp_email_config_ready
|
|
|
|
|
|
DiagnosticRunner = Callable[[], Awaitable[Dict[str, Any]]]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DiagnosticCheck:
|
|
key: str
|
|
label: str
|
|
category: str
|
|
description: str
|
|
live_safe: bool
|
|
configured: bool
|
|
config_detail: str
|
|
target: Optional[str]
|
|
runner: DiagnosticRunner
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _clean_text(value: Any, fallback: str = "") -> str:
|
|
if value is None:
|
|
return fallback
|
|
if isinstance(value, str):
|
|
trimmed = value.strip()
|
|
return trimmed if trimmed else fallback
|
|
return str(value)
|
|
|
|
|
|
def _url_target(url: Optional[str]) -> Optional[str]:
|
|
raw = _clean_text(url)
|
|
if not raw:
|
|
return None
|
|
try:
|
|
parsed = urlparse(raw)
|
|
except Exception:
|
|
return raw
|
|
host = parsed.hostname or parsed.netloc or raw
|
|
if parsed.port:
|
|
host = f"{host}:{parsed.port}"
|
|
return host
|
|
|
|
|
|
def _host_port_target(host: Optional[str], port: Optional[int]) -> Optional[str]:
|
|
resolved_host = _clean_text(host)
|
|
if not resolved_host:
|
|
return None
|
|
if port is None:
|
|
return resolved_host
|
|
return f"{resolved_host}:{port}"
|
|
|
|
|
|
def _http_error_detail(exc: Exception) -> str:
|
|
if isinstance(exc, httpx.HTTPStatusError):
|
|
response = exc.response
|
|
body = ""
|
|
try:
|
|
body = response.text.strip()
|
|
except Exception:
|
|
body = ""
|
|
if body:
|
|
return f"HTTP {response.status_code}: {body}"
|
|
return f"HTTP {response.status_code}"
|
|
return str(exc)
|
|
|
|
|
|
def _config_status(detail: str) -> str:
|
|
lowered = detail.lower()
|
|
if "disabled" in lowered:
|
|
return "disabled"
|
|
return "not_configured"
|
|
|
|
|
|
def _discord_config_ready(runtime) -> tuple[bool, str]:
|
|
if not runtime.magent_notify_enabled or not runtime.magent_notify_discord_enabled:
|
|
return False, "Discord notifications are disabled."
|
|
if _clean_text(runtime.magent_notify_discord_webhook_url) or _clean_text(runtime.discord_webhook_url):
|
|
return True, "ok"
|
|
return False, "Discord webhook URL is required."
|
|
|
|
|
|
def _telegram_config_ready(runtime) -> tuple[bool, str]:
|
|
if not runtime.magent_notify_enabled or not runtime.magent_notify_telegram_enabled:
|
|
return False, "Telegram notifications are disabled."
|
|
if _clean_text(runtime.magent_notify_telegram_bot_token) and _clean_text(runtime.magent_notify_telegram_chat_id):
|
|
return True, "ok"
|
|
return False, "Telegram bot token and chat ID are required."
|
|
|
|
|
|
def _webhook_config_ready(runtime) -> tuple[bool, str]:
|
|
if not runtime.magent_notify_enabled or not runtime.magent_notify_webhook_enabled:
|
|
return False, "Generic webhook notifications are disabled."
|
|
if _clean_text(runtime.magent_notify_webhook_url):
|
|
return True, "ok"
|
|
return False, "Generic webhook URL is required."
|
|
|
|
|
|
def _push_config_ready(runtime) -> tuple[bool, str]:
|
|
if not runtime.magent_notify_enabled or not runtime.magent_notify_push_enabled:
|
|
return False, "Push notifications are disabled."
|
|
provider = _clean_text(runtime.magent_notify_push_provider, "ntfy").lower()
|
|
if provider == "ntfy":
|
|
if _clean_text(runtime.magent_notify_push_base_url) and _clean_text(runtime.magent_notify_push_topic):
|
|
return True, "ok"
|
|
return False, "ntfy requires a base URL and topic."
|
|
if provider == "gotify":
|
|
if _clean_text(runtime.magent_notify_push_base_url) and _clean_text(runtime.magent_notify_push_token):
|
|
return True, "ok"
|
|
return False, "Gotify requires a base URL and app token."
|
|
if provider == "pushover":
|
|
if _clean_text(runtime.magent_notify_push_token) and _clean_text(runtime.magent_notify_push_user_key):
|
|
return True, "ok"
|
|
return False, "Pushover requires an application token and user key."
|
|
if provider == "webhook":
|
|
if _clean_text(runtime.magent_notify_push_base_url):
|
|
return True, "ok"
|
|
return False, "Webhook relay requires a target URL."
|
|
if provider == "telegram":
|
|
return _telegram_config_ready(runtime)
|
|
if provider == "discord":
|
|
return _discord_config_ready(runtime)
|
|
return False, f"Unsupported push provider: {provider or 'unknown'}"
|
|
|
|
|
|
def _summary_from_results(results: Sequence[Dict[str, Any]]) -> Dict[str, int]:
|
|
summary = {
|
|
"total": len(results),
|
|
"up": 0,
|
|
"down": 0,
|
|
"degraded": 0,
|
|
"not_configured": 0,
|
|
"disabled": 0,
|
|
}
|
|
for result in results:
|
|
status = str(result.get("status") or "").strip().lower()
|
|
if status in summary:
|
|
summary[status] += 1
|
|
return summary
|
|
|
|
|
|
async def _run_http_json_get(
|
|
url: str,
|
|
*,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
params: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
|
|
response = await client.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
return {"response": payload}
|
|
|
|
|
|
async def _run_http_text_get(url: str) -> Dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
body = response.text
|
|
return {"response": body, "message": f"HTTP {response.status_code}"}
|
|
|
|
|
|
async def _run_http_post(
|
|
url: str,
|
|
*,
|
|
json_payload: Optional[Dict[str, Any]] = None,
|
|
data_payload: Any = None,
|
|
params: Optional[Dict[str, Any]] = None,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
) -> Dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
|
|
response = await client.post(url, json=json_payload, data=data_payload, params=params, headers=headers)
|
|
response.raise_for_status()
|
|
if not response.content:
|
|
return {"message": f"HTTP {response.status_code}"}
|
|
content_type = response.headers.get("content-type", "")
|
|
if "application/json" in content_type.lower():
|
|
try:
|
|
return {"response": response.json(), "message": f"HTTP {response.status_code}"}
|
|
except Exception:
|
|
pass
|
|
return {"response": response.text.strip(), "message": f"HTTP {response.status_code}"}
|
|
|
|
|
|
async def _run_database_check() -> Dict[str, Any]:
|
|
integrity = await asyncio.to_thread(run_integrity_check)
|
|
status = "up" if integrity == "ok" else "degraded"
|
|
return {
|
|
"status": status,
|
|
"message": f"SQLite integrity_check returned {integrity}",
|
|
"detail": integrity,
|
|
}
|
|
|
|
|
|
async def _run_magent_api_check(runtime) -> Dict[str, Any]:
|
|
base_url = _clean_text(runtime.magent_api_url) or f"http://127.0.0.1:{int(runtime.magent_api_port or 8000)}"
|
|
result = await _run_http_json_get(f"{base_url.rstrip('/')}/health")
|
|
payload = result.get("response")
|
|
build_number = payload.get("build") if isinstance(payload, dict) else None
|
|
message = "Health endpoint responded"
|
|
if build_number:
|
|
message = f"Health endpoint responded (build {build_number})"
|
|
return {"message": message, "detail": payload}
|
|
|
|
|
|
async def _run_magent_web_check(runtime) -> Dict[str, Any]:
|
|
base_url = _clean_text(runtime.magent_application_url) or f"http://127.0.0.1:{int(runtime.magent_application_port or 3000)}"
|
|
result = await _run_http_text_get(base_url.rstrip("/"))
|
|
body = result.get("response")
|
|
if isinstance(body, str) and "<html" in body.lower():
|
|
return {"message": "Application page responded", "detail": "html"}
|
|
return {"status": "degraded", "message": "Application responded with unexpected content"}
|
|
|
|
|
|
async def _run_seerr_check(runtime) -> Dict[str, Any]:
|
|
client = JellyseerrClient(runtime.jellyseerr_base_url, runtime.jellyseerr_api_key)
|
|
payload = await client.get_status()
|
|
version = payload.get("version") if isinstance(payload, dict) else None
|
|
message = "Seerr responded"
|
|
if version:
|
|
message = f"Seerr version {version}"
|
|
return {"message": message, "detail": payload}
|
|
|
|
|
|
async def _run_sonarr_check(runtime) -> Dict[str, Any]:
|
|
client = SonarrClient(runtime.sonarr_base_url, runtime.sonarr_api_key)
|
|
payload = await client.get_system_status()
|
|
version = payload.get("version") if isinstance(payload, dict) else None
|
|
message = "Sonarr responded"
|
|
if version:
|
|
message = f"Sonarr version {version}"
|
|
return {"message": message, "detail": payload}
|
|
|
|
|
|
async def _run_radarr_check(runtime) -> Dict[str, Any]:
|
|
client = RadarrClient(runtime.radarr_base_url, runtime.radarr_api_key)
|
|
payload = await client.get_system_status()
|
|
version = payload.get("version") if isinstance(payload, dict) else None
|
|
message = "Radarr responded"
|
|
if version:
|
|
message = f"Radarr version {version}"
|
|
return {"message": message, "detail": payload}
|
|
|
|
|
|
async def _run_prowlarr_check(runtime) -> Dict[str, Any]:
|
|
client = ProwlarrClient(runtime.prowlarr_base_url, runtime.prowlarr_api_key)
|
|
payload = await client.get_health()
|
|
if isinstance(payload, list) and payload:
|
|
return {
|
|
"status": "degraded",
|
|
"message": f"Prowlarr health warnings: {len(payload)}",
|
|
"detail": payload,
|
|
}
|
|
return {"message": "Prowlarr reported healthy", "detail": payload}
|
|
|
|
|
|
async def _run_qbittorrent_check(runtime) -> Dict[str, Any]:
|
|
client = QBittorrentClient(
|
|
runtime.qbittorrent_base_url,
|
|
runtime.qbittorrent_username,
|
|
runtime.qbittorrent_password,
|
|
)
|
|
version = await client.get_app_version()
|
|
message = "qBittorrent responded"
|
|
if isinstance(version, str) and version:
|
|
message = f"qBittorrent version {version}"
|
|
return {"message": message, "detail": version}
|
|
|
|
|
|
async def _run_jellyfin_check(runtime) -> Dict[str, Any]:
|
|
client = JellyfinClient(runtime.jellyfin_base_url, runtime.jellyfin_api_key)
|
|
payload = await client.get_system_info()
|
|
version = payload.get("Version") if isinstance(payload, dict) else None
|
|
message = "Jellyfin responded"
|
|
if version:
|
|
message = f"Jellyfin version {version}"
|
|
return {"message": message, "detail": payload}
|
|
|
|
|
|
async def _run_email_check(recipient_email: Optional[str] = None) -> Dict[str, Any]:
|
|
result = await send_test_email(recipient_email=recipient_email)
|
|
recipient = _clean_text(result.get("recipient_email"), "configured recipient")
|
|
return {"message": f"Test email sent to {recipient}", "detail": result}
|
|
|
|
|
|
async def _run_discord_check(runtime) -> Dict[str, Any]:
|
|
webhook_url = _clean_text(runtime.magent_notify_discord_webhook_url) or _clean_text(runtime.discord_webhook_url)
|
|
payload = {
|
|
"content": f"{env_settings.app_name} diagnostics ping\nBuild {env_settings.site_build_number or 'unknown'}",
|
|
}
|
|
result = await _run_http_post(webhook_url, json_payload=payload)
|
|
return {"message": "Discord webhook accepted ping", "detail": result.get("response")}
|
|
|
|
|
|
async def _run_telegram_check(runtime) -> Dict[str, Any]:
|
|
bot_token = _clean_text(runtime.magent_notify_telegram_bot_token)
|
|
chat_id = _clean_text(runtime.magent_notify_telegram_chat_id)
|
|
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
|
payload = {
|
|
"chat_id": chat_id,
|
|
"text": f"{env_settings.app_name} diagnostics ping\nBuild {env_settings.site_build_number or 'unknown'}",
|
|
}
|
|
result = await _run_http_post(url, json_payload=payload)
|
|
return {"message": "Telegram ping accepted", "detail": result.get("response")}
|
|
|
|
|
|
async def _run_webhook_check(runtime) -> Dict[str, Any]:
|
|
webhook_url = _clean_text(runtime.magent_notify_webhook_url)
|
|
payload = {
|
|
"type": "diagnostics.ping",
|
|
"application": env_settings.app_name,
|
|
"build": env_settings.site_build_number,
|
|
"checked_at": _now_iso(),
|
|
}
|
|
result = await _run_http_post(webhook_url, json_payload=payload)
|
|
return {"message": "Webhook accepted ping", "detail": result.get("response")}
|
|
|
|
|
|
async def _run_push_check(runtime) -> Dict[str, Any]:
|
|
provider = _clean_text(runtime.magent_notify_push_provider, "ntfy").lower()
|
|
message = f"{env_settings.app_name} diagnostics ping"
|
|
build_suffix = f"Build {env_settings.site_build_number or 'unknown'}"
|
|
|
|
if provider == "ntfy":
|
|
base_url = _clean_text(runtime.magent_notify_push_base_url)
|
|
topic = _clean_text(runtime.magent_notify_push_topic)
|
|
result = await _run_http_post(
|
|
f"{base_url.rstrip('/')}/{topic}",
|
|
data_payload=f"{message}\n{build_suffix}",
|
|
headers={"Content-Type": "text/plain; charset=utf-8"},
|
|
)
|
|
return {"message": "ntfy push accepted", "detail": result.get("response")}
|
|
|
|
if provider == "gotify":
|
|
base_url = _clean_text(runtime.magent_notify_push_base_url)
|
|
token = _clean_text(runtime.magent_notify_push_token)
|
|
result = await _run_http_post(
|
|
f"{base_url.rstrip('/')}/message",
|
|
json_payload={"title": env_settings.app_name, "message": build_suffix, "priority": 5},
|
|
params={"token": token},
|
|
)
|
|
return {"message": "Gotify push accepted", "detail": result.get("response")}
|
|
|
|
if provider == "pushover":
|
|
token = _clean_text(runtime.magent_notify_push_token)
|
|
user_key = _clean_text(runtime.magent_notify_push_user_key)
|
|
device = _clean_text(runtime.magent_notify_push_device)
|
|
payload = {
|
|
"token": token,
|
|
"user": user_key,
|
|
"message": f"{message}\n{build_suffix}",
|
|
"title": env_settings.app_name,
|
|
}
|
|
if device:
|
|
payload["device"] = device
|
|
result = await _run_http_post("https://api.pushover.net/1/messages.json", data_payload=payload)
|
|
return {"message": "Pushover push accepted", "detail": result.get("response")}
|
|
|
|
if provider == "webhook":
|
|
base_url = _clean_text(runtime.magent_notify_push_base_url)
|
|
payload = {
|
|
"type": "diagnostics.push",
|
|
"application": env_settings.app_name,
|
|
"build": env_settings.site_build_number,
|
|
"checked_at": _now_iso(),
|
|
}
|
|
result = await _run_http_post(base_url, json_payload=payload)
|
|
return {"message": "Push webhook accepted", "detail": result.get("response")}
|
|
|
|
if provider == "telegram":
|
|
return await _run_telegram_check(runtime)
|
|
|
|
if provider == "discord":
|
|
return await _run_discord_check(runtime)
|
|
|
|
raise RuntimeError(f"Unsupported push provider: {provider}")
|
|
|
|
|
|
def _build_diagnostic_checks(recipient_email: Optional[str] = None) -> List[DiagnosticCheck]:
|
|
runtime = get_runtime_settings()
|
|
seerr_target = _url_target(runtime.jellyseerr_base_url)
|
|
jellyfin_target = _url_target(runtime.jellyfin_base_url)
|
|
sonarr_target = _url_target(runtime.sonarr_base_url)
|
|
radarr_target = _url_target(runtime.radarr_base_url)
|
|
prowlarr_target = _url_target(runtime.prowlarr_base_url)
|
|
qbittorrent_target = _url_target(runtime.qbittorrent_base_url)
|
|
application_target = _url_target(runtime.magent_application_url) or _host_port_target("127.0.0.1", runtime.magent_application_port)
|
|
api_target = _url_target(runtime.magent_api_url) or _host_port_target("127.0.0.1", runtime.magent_api_port)
|
|
smtp_target = _host_port_target(runtime.magent_notify_email_smtp_host, runtime.magent_notify_email_smtp_port)
|
|
discord_target = _url_target(runtime.magent_notify_discord_webhook_url) or _url_target(runtime.discord_webhook_url)
|
|
telegram_target = "api.telegram.org" if _clean_text(runtime.magent_notify_telegram_bot_token) else None
|
|
webhook_target = _url_target(runtime.magent_notify_webhook_url)
|
|
|
|
push_provider = _clean_text(runtime.magent_notify_push_provider, "ntfy").lower()
|
|
push_target = None
|
|
if push_provider == "pushover":
|
|
push_target = "api.pushover.net"
|
|
elif push_provider == "telegram":
|
|
push_target = telegram_target or "api.telegram.org"
|
|
elif push_provider == "discord":
|
|
push_target = discord_target or "discord.com"
|
|
else:
|
|
push_target = _url_target(runtime.magent_notify_push_base_url)
|
|
|
|
email_ready, email_detail = smtp_email_config_ready()
|
|
discord_ready, discord_detail = _discord_config_ready(runtime)
|
|
telegram_ready, telegram_detail = _telegram_config_ready(runtime)
|
|
push_ready, push_detail = _push_config_ready(runtime)
|
|
webhook_ready, webhook_detail = _webhook_config_ready(runtime)
|
|
|
|
checks = [
|
|
DiagnosticCheck(
|
|
key="magent-web",
|
|
label="Magent application",
|
|
category="Application",
|
|
description="Checks that the frontend application URL is responding.",
|
|
live_safe=True,
|
|
configured=True,
|
|
config_detail="ok",
|
|
target=application_target,
|
|
runner=lambda runtime=runtime: _run_magent_web_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="magent-api",
|
|
label="Magent API",
|
|
category="Application",
|
|
description="Checks the Magent API health endpoint.",
|
|
live_safe=True,
|
|
configured=True,
|
|
config_detail="ok",
|
|
target=api_target,
|
|
runner=lambda runtime=runtime: _run_magent_api_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="database",
|
|
label="SQLite database",
|
|
category="Application",
|
|
description="Runs SQLite integrity_check against the current Magent database.",
|
|
live_safe=True,
|
|
configured=True,
|
|
config_detail="ok",
|
|
target="sqlite",
|
|
runner=_run_database_check,
|
|
),
|
|
DiagnosticCheck(
|
|
key="seerr",
|
|
label="Seerr",
|
|
category="Media services",
|
|
description="Checks Seerr API reachability and version.",
|
|
live_safe=True,
|
|
configured=bool(runtime.jellyseerr_base_url and runtime.jellyseerr_api_key),
|
|
config_detail="Seerr URL and API key are required.",
|
|
target=seerr_target,
|
|
runner=lambda runtime=runtime: _run_seerr_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="jellyfin",
|
|
label="Jellyfin",
|
|
category="Media services",
|
|
description="Checks Jellyfin system info with the configured API key.",
|
|
live_safe=True,
|
|
configured=bool(runtime.jellyfin_base_url and runtime.jellyfin_api_key),
|
|
config_detail="Jellyfin URL and API key are required.",
|
|
target=jellyfin_target,
|
|
runner=lambda runtime=runtime: _run_jellyfin_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="sonarr",
|
|
label="Sonarr",
|
|
category="Media services",
|
|
description="Checks Sonarr system status with the configured API key.",
|
|
live_safe=True,
|
|
configured=bool(runtime.sonarr_base_url and runtime.sonarr_api_key),
|
|
config_detail="Sonarr URL and API key are required.",
|
|
target=sonarr_target,
|
|
runner=lambda runtime=runtime: _run_sonarr_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="radarr",
|
|
label="Radarr",
|
|
category="Media services",
|
|
description="Checks Radarr system status with the configured API key.",
|
|
live_safe=True,
|
|
configured=bool(runtime.radarr_base_url and runtime.radarr_api_key),
|
|
config_detail="Radarr URL and API key are required.",
|
|
target=radarr_target,
|
|
runner=lambda runtime=runtime: _run_radarr_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="prowlarr",
|
|
label="Prowlarr",
|
|
category="Media services",
|
|
description="Checks Prowlarr health and flags warnings as degraded.",
|
|
live_safe=True,
|
|
configured=bool(runtime.prowlarr_base_url and runtime.prowlarr_api_key),
|
|
config_detail="Prowlarr URL and API key are required.",
|
|
target=prowlarr_target,
|
|
runner=lambda runtime=runtime: _run_prowlarr_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="qbittorrent",
|
|
label="qBittorrent",
|
|
category="Media services",
|
|
description="Checks qBittorrent login and app version.",
|
|
live_safe=True,
|
|
configured=bool(
|
|
runtime.qbittorrent_base_url and runtime.qbittorrent_username and runtime.qbittorrent_password
|
|
),
|
|
config_detail="qBittorrent URL, username, and password are required.",
|
|
target=qbittorrent_target,
|
|
runner=lambda runtime=runtime: _run_qbittorrent_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="email",
|
|
label="SMTP email",
|
|
category="Notifications",
|
|
description="Sends a live test email using the configured SMTP provider.",
|
|
live_safe=False,
|
|
configured=email_ready,
|
|
config_detail=email_detail,
|
|
target=smtp_target,
|
|
runner=lambda recipient_email=recipient_email: _run_email_check(recipient_email),
|
|
),
|
|
DiagnosticCheck(
|
|
key="discord",
|
|
label="Discord webhook",
|
|
category="Notifications",
|
|
description="Posts a live test message to the configured Discord webhook.",
|
|
live_safe=False,
|
|
configured=discord_ready,
|
|
config_detail=discord_detail,
|
|
target=discord_target,
|
|
runner=lambda runtime=runtime: _run_discord_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="telegram",
|
|
label="Telegram",
|
|
category="Notifications",
|
|
description="Sends a live test message to the configured Telegram chat.",
|
|
live_safe=False,
|
|
configured=telegram_ready,
|
|
config_detail=telegram_detail,
|
|
target=telegram_target,
|
|
runner=lambda runtime=runtime: _run_telegram_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="push",
|
|
label="Push/mobile provider",
|
|
category="Notifications",
|
|
description="Sends a live test message through the configured push provider.",
|
|
live_safe=False,
|
|
configured=push_ready,
|
|
config_detail=push_detail,
|
|
target=push_target,
|
|
runner=lambda runtime=runtime: _run_push_check(runtime),
|
|
),
|
|
DiagnosticCheck(
|
|
key="webhook",
|
|
label="Generic webhook",
|
|
category="Notifications",
|
|
description="Posts a live test payload to the configured generic webhook.",
|
|
live_safe=False,
|
|
configured=webhook_ready,
|
|
config_detail=webhook_detail,
|
|
target=webhook_target,
|
|
runner=lambda runtime=runtime: _run_webhook_check(runtime),
|
|
),
|
|
]
|
|
return checks
|
|
|
|
|
|
async def _execute_check(check: DiagnosticCheck) -> Dict[str, Any]:
|
|
if not check.configured:
|
|
return {
|
|
"key": check.key,
|
|
"label": check.label,
|
|
"category": check.category,
|
|
"description": check.description,
|
|
"target": check.target,
|
|
"live_safe": check.live_safe,
|
|
"configured": False,
|
|
"status": _config_status(check.config_detail),
|
|
"message": check.config_detail,
|
|
"checked_at": _now_iso(),
|
|
"duration_ms": 0,
|
|
}
|
|
|
|
started = perf_counter()
|
|
checked_at = _now_iso()
|
|
try:
|
|
payload = await check.runner()
|
|
status = _clean_text(payload.get("status"), "up")
|
|
message = _clean_text(payload.get("message"), "Check passed")
|
|
detail = payload.get("detail")
|
|
return {
|
|
"key": check.key,
|
|
"label": check.label,
|
|
"category": check.category,
|
|
"description": check.description,
|
|
"target": check.target,
|
|
"live_safe": check.live_safe,
|
|
"configured": True,
|
|
"status": status,
|
|
"message": message,
|
|
"detail": detail,
|
|
"checked_at": checked_at,
|
|
"duration_ms": round((perf_counter() - started) * 1000, 1),
|
|
}
|
|
except httpx.HTTPError as exc:
|
|
return {
|
|
"key": check.key,
|
|
"label": check.label,
|
|
"category": check.category,
|
|
"description": check.description,
|
|
"target": check.target,
|
|
"live_safe": check.live_safe,
|
|
"configured": True,
|
|
"status": "down",
|
|
"message": _http_error_detail(exc),
|
|
"checked_at": checked_at,
|
|
"duration_ms": round((perf_counter() - started) * 1000, 1),
|
|
}
|
|
except Exception as exc:
|
|
return {
|
|
"key": check.key,
|
|
"label": check.label,
|
|
"category": check.category,
|
|
"description": check.description,
|
|
"target": check.target,
|
|
"live_safe": check.live_safe,
|
|
"configured": True,
|
|
"status": "down",
|
|
"message": str(exc),
|
|
"checked_at": checked_at,
|
|
"duration_ms": round((perf_counter() - started) * 1000, 1),
|
|
}
|
|
|
|
|
|
def get_diagnostics_catalog() -> Dict[str, Any]:
|
|
checks = _build_diagnostic_checks()
|
|
items = []
|
|
for check in checks:
|
|
items.append(
|
|
{
|
|
"key": check.key,
|
|
"label": check.label,
|
|
"category": check.category,
|
|
"description": check.description,
|
|
"live_safe": check.live_safe,
|
|
"target": check.target,
|
|
"configured": check.configured,
|
|
"config_status": "configured" if check.configured else _config_status(check.config_detail),
|
|
"config_detail": "Ready to test." if check.configured else check.config_detail,
|
|
}
|
|
)
|
|
categories = sorted({item["category"] for item in items})
|
|
return {
|
|
"checks": items,
|
|
"categories": categories,
|
|
"generated_at": _now_iso(),
|
|
}
|
|
|
|
|
|
async def run_diagnostics(keys: Optional[Sequence[str]] = None, recipient_email: Optional[str] = None) -> Dict[str, Any]:
|
|
checks = _build_diagnostic_checks(recipient_email=recipient_email)
|
|
selected = {str(key).strip().lower() for key in (keys or []) if str(key).strip()}
|
|
if selected:
|
|
checks = [check for check in checks if check.key.lower() in selected]
|
|
results = await asyncio.gather(*(_execute_check(check) for check in checks))
|
|
return {
|
|
"results": results,
|
|
"summary": _summary_from_results(results),
|
|
"checked_at": _now_iso(),
|
|
}
|