126 lines
3.6 KiB
Python
126 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Iterable, List
|
|
|
|
import httpx
|
|
|
|
from ..db import get_admin_notification_targets
|
|
from ..runtime import get_runtime_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _normalize_urls(urls: Iterable[str]) -> List[str]:
|
|
normalized: list[str] = []
|
|
seen: set[str] = set()
|
|
for entry in urls:
|
|
if not isinstance(entry, str):
|
|
continue
|
|
value = entry.strip()
|
|
if value and value not in seen:
|
|
normalized.append(value)
|
|
seen.add(value)
|
|
return normalized
|
|
|
|
|
|
def validate_apprise_urls(urls: Iterable[str]) -> List[str]:
|
|
normalized = _normalize_urls(urls)
|
|
if not normalized:
|
|
return []
|
|
invalid: list[str] = []
|
|
for url in normalized:
|
|
if "://" not in url:
|
|
invalid.append(url)
|
|
if invalid:
|
|
raise ValueError(
|
|
"Invalid Apprise URL(s): "
|
|
+ ", ".join(invalid)
|
|
+ " (each URL must include a scheme like discord:// or mailto://)"
|
|
)
|
|
return normalized
|
|
|
|
|
|
def _get_apprise_notify_url() -> str | None:
|
|
runtime = get_runtime_settings()
|
|
base_url = (runtime.apprise_base_url or "").strip()
|
|
if not base_url:
|
|
return None
|
|
if "://" not in base_url:
|
|
base_url = f"http://{base_url}"
|
|
base_url = base_url.rstrip("/")
|
|
if base_url.endswith("/notify"):
|
|
return base_url
|
|
return f"{base_url}/notify"
|
|
|
|
|
|
def _get_apprise_headers() -> dict[str, str]:
|
|
runtime = get_runtime_settings()
|
|
headers = {"Content-Type": "application/json"}
|
|
api_key = (runtime.apprise_api_key or "").strip()
|
|
if api_key:
|
|
headers["X-API-Key"] = api_key
|
|
headers["Authorization"] = f"Bearer {api_key}"
|
|
return headers
|
|
|
|
|
|
def send_apprise_notification(urls: Iterable[str], title: str, body: str) -> bool:
|
|
try:
|
|
normalized = validate_apprise_urls(urls)
|
|
except ValueError as exc:
|
|
logger.warning("Apprise notification skipped due to invalid URL(s): %s", exc)
|
|
return False
|
|
if not normalized:
|
|
return False
|
|
|
|
notify_url = _get_apprise_notify_url()
|
|
if not notify_url:
|
|
logger.warning("Apprise notification skipped: APPRISE_BASE_URL is not configured.")
|
|
return False
|
|
|
|
payload = {
|
|
"urls": normalized,
|
|
"title": str(title or "Magent notification").strip() or "Magent notification",
|
|
"body": str(body or "").strip(),
|
|
}
|
|
if not payload["body"]:
|
|
return False
|
|
|
|
try:
|
|
with httpx.Client(timeout=10.0) as client:
|
|
response = client.post(notify_url, headers=_get_apprise_headers(), json=payload)
|
|
response.raise_for_status()
|
|
except httpx.HTTPError as exc:
|
|
logger.warning("Apprise sidecar notify failed: %s", exc)
|
|
return False
|
|
|
|
try:
|
|
data = response.json()
|
|
except ValueError:
|
|
return True
|
|
|
|
if isinstance(data, dict):
|
|
if data.get("status") in {"error", "failed"}:
|
|
return False
|
|
if "sent" in data:
|
|
return bool(data.get("sent"))
|
|
return True
|
|
|
|
|
|
def notify_admins_new_signup(username: str, provider: str) -> int:
|
|
targets = get_admin_notification_targets()
|
|
if not targets:
|
|
return 0
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
title = "New Magent user signup"
|
|
body = f"User {username} signed in via {provider} at {timestamp}."
|
|
sent = 0
|
|
for target in targets:
|
|
urls = target.get("urls") or []
|
|
if send_apprise_notification(urls, title, body):
|
|
sent += 1
|
|
if sent == 0:
|
|
logger.info("Apprise signup notification skipped (no valid admin targets).")
|
|
return sent
|