from __future__ import annotations import logging from datetime import datetime, timezone from typing import Iterable, List import httpx from ..db import get_admin_notification_targets from ..runtime import get_runtime_settings logger = logging.getLogger(__name__) def _normalize_urls(urls: Iterable[str]) -> List[str]: normalized: list[str] = [] seen: set[str] = set() for entry in urls: if not isinstance(entry, str): continue value = entry.strip() if value and value not in seen: normalized.append(value) seen.add(value) return normalized def validate_apprise_urls(urls: Iterable[str]) -> List[str]: normalized = _normalize_urls(urls) if not normalized: return [] invalid: list[str] = [] for url in normalized: if "://" not in url: invalid.append(url) if invalid: raise ValueError( "Invalid Apprise URL(s): " + ", ".join(invalid) + " (each URL must include a scheme like discord:// or mailto://)" ) return normalized def _get_apprise_notify_url() -> str | None: runtime = get_runtime_settings() base_url = (runtime.apprise_base_url or "").strip() if not base_url: return None if "://" not in base_url: base_url = f"http://{base_url}" base_url = base_url.rstrip("/") if base_url.endswith("/notify"): return base_url return f"{base_url}/notify" def _get_apprise_headers() -> dict[str, str]: runtime = get_runtime_settings() headers = {"Content-Type": "application/json"} api_key = (runtime.apprise_api_key or "").strip() if api_key: headers["X-API-Key"] = api_key headers["Authorization"] = f"Bearer {api_key}" return headers def send_apprise_notification(urls: Iterable[str], title: str, body: str) -> bool: try: normalized = validate_apprise_urls(urls) except ValueError as exc: logger.warning("Apprise notification skipped due to invalid URL(s): %s", exc) return False if not normalized: return False notify_url = _get_apprise_notify_url() if not notify_url: logger.warning("Apprise notification skipped: APPRISE_BASE_URL is not configured.") return False payload = { "urls": normalized, "title": str(title or "Magent notification").strip() or "Magent notification", "body": str(body or "").strip(), } if not payload["body"]: return False try: with httpx.Client(timeout=10.0) as client: response = client.post(notify_url, headers=_get_apprise_headers(), json=payload) response.raise_for_status() except httpx.HTTPError as exc: logger.warning("Apprise sidecar notify failed: %s", exc) return False try: data = response.json() except ValueError: return True if isinstance(data, dict): if data.get("status") in {"error", "failed"}: return False if "sent" in data: return bool(data.get("sent")) return True def notify_admins_new_signup(username: str, provider: str) -> int: targets = get_admin_notification_targets() if not targets: return 0 timestamp = datetime.now(timezone.utc).isoformat() title = "New Magent user signup" body = f"User {username} signed in via {provider} at {timestamp}." sent = 0 for target in targets: urls = target.get("urls") or [] if send_apprise_notification(urls, title, body): sent += 1 if sent == 0: logger.info("Apprise signup notification skipped (no valid admin targets).") return sent