from __future__ import annotations import logging from typing import Any, Dict, Optional from urllib.parse import quote import httpx from ..config import settings as env_settings from ..db import get_setting from ..runtime import get_runtime_settings from .invite_email import send_generic_email logger = logging.getLogger(__name__) def _clean_text(value: Any, fallback: str = "") -> str: if value is None: return fallback if isinstance(value, str): trimmed = value.strip() return trimmed if trimmed else fallback return str(value) def _split_emails(value: str) -> list[str]: if not value: return [] parts = [entry.strip() for entry in value.replace(";", ",").split(",")] return [entry for entry in parts if entry and "@" in entry] def _resolve_app_url() -> str: runtime = get_runtime_settings() for candidate in ( runtime.magent_application_url, runtime.magent_proxy_base_url, env_settings.cors_allow_origin, ): normalized = _clean_text(candidate) if normalized: return normalized.rstrip("/") port = int(getattr(runtime, "magent_application_port", 3000) or 3000) return f"http://localhost:{port}" def _portal_item_url(item_id: int) -> str: return f"{_resolve_app_url()}/portal?item={item_id}" async def _http_post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: async with httpx.AsyncClient(timeout=12.0) as client: response = await client.post(url, json=payload) response.raise_for_status() try: body = response.json() except ValueError: body = response.text return {"status_code": response.status_code, "body": body} async def _send_discord(title: str, message: str, payload: Dict[str, Any]) -> Dict[str, Any]: runtime = get_runtime_settings() webhook = _clean_text(runtime.magent_notify_discord_webhook_url) or _clean_text( runtime.discord_webhook_url ) if not webhook: return {"status": "skipped", "detail": "Discord webhook not configured."} data = { "content": f"**{title}**\n{message}", "embeds": [ { "title": title, "description": message, "fields": [ {"name": "Type", "value": _clean_text(payload.get("kind"), "unknown"), "inline": True}, {"name": "Status", "value": _clean_text(payload.get("status"), "unknown"), "inline": True}, {"name": "Priority", "value": _clean_text(payload.get("priority"), "normal"), "inline": True}, ], "url": _clean_text(payload.get("item_url")), } ], } result = await _http_post_json(webhook, data) return {"status": "ok", "detail": f"Discord accepted ({result['status_code']})."} async def _send_telegram(title: str, message: str) -> Dict[str, Any]: runtime = get_runtime_settings() bot_token = _clean_text(runtime.magent_notify_telegram_bot_token) chat_id = _clean_text(runtime.magent_notify_telegram_chat_id) if not bot_token or not chat_id: return {"status": "skipped", "detail": "Telegram is not configured."} url = f"https://api.telegram.org/bot{bot_token}/sendMessage" payload = {"chat_id": chat_id, "text": f"{title}\n\n{message}", "disable_web_page_preview": True} result = await _http_post_json(url, payload) return {"status": "ok", "detail": f"Telegram accepted ({result['status_code']})."} async def _send_webhook(payload: Dict[str, Any]) -> Dict[str, Any]: runtime = get_runtime_settings() webhook = _clean_text(runtime.magent_notify_webhook_url) if not webhook: return {"status": "skipped", "detail": "Generic webhook is not configured."} result = await _http_post_json(webhook, payload) return {"status": "ok", "detail": f"Webhook accepted ({result['status_code']})."} async def _send_push(title: str, message: str, payload: Dict[str, Any]) -> Dict[str, Any]: runtime = get_runtime_settings() provider = _clean_text(runtime.magent_notify_push_provider, "ntfy").lower() base_url = _clean_text(runtime.magent_notify_push_base_url) token = _clean_text(runtime.magent_notify_push_token) topic = _clean_text(runtime.magent_notify_push_topic) if provider == "ntfy": if not base_url or not topic: return {"status": "skipped", "detail": "ntfy needs base URL and topic."} url = f"{base_url.rstrip('/')}/{quote(topic)}" headers = {"Title": title, "Tags": "magent,portal"} async with httpx.AsyncClient(timeout=12.0) as client: response = await client.post(url, content=message.encode("utf-8"), headers=headers) response.raise_for_status() return {"status": "ok", "detail": f"ntfy accepted ({response.status_code})."} if provider == "gotify": if not base_url or not token: return {"status": "skipped", "detail": "Gotify needs base URL and token."} url = f"{base_url.rstrip('/')}/message?token={quote(token)}" body = {"title": title, "message": message, "priority": 5, "extras": {"client::display": {"contentType": "text/plain"}}} result = await _http_post_json(url, body) return {"status": "ok", "detail": f"Gotify accepted ({result['status_code']})."} if provider == "pushover": user_key = _clean_text(runtime.magent_notify_push_user_key) if not token or not user_key: return {"status": "skipped", "detail": "Pushover needs token and user key."} form = {"token": token, "user": user_key, "title": title, "message": message} async with httpx.AsyncClient(timeout=12.0) as client: response = await client.post("https://api.pushover.net/1/messages.json", data=form) response.raise_for_status() return {"status": "ok", "detail": f"Pushover accepted ({response.status_code})."} if provider == "discord": return await _send_discord(title, message, payload) if provider == "telegram": return await _send_telegram(title, message) if provider == "webhook": return await _send_webhook(payload) return {"status": "skipped", "detail": f"Unsupported push provider '{provider}'."} async def _send_email(title: str, message: str, payload: Dict[str, Any]) -> Dict[str, Any]: runtime = get_runtime_settings() recipients = _split_emails(_clean_text(get_setting("portal_notification_recipients"))) fallback = _clean_text(runtime.magent_notify_email_from_address) if fallback and fallback not in recipients: recipients.append(fallback) if not recipients: return {"status": "skipped", "detail": "No portal notification recipient is configured."} body_text = ( f"{title}\n\n" f"{message}\n\n" f"Kind: {_clean_text(payload.get('kind'))}\n" f"Status: {_clean_text(payload.get('status'))}\n" f"Priority: {_clean_text(payload.get('priority'))}\n" f"Requested by: {_clean_text(payload.get('requested_by'))}\n" f"Open: {_clean_text(payload.get('item_url'))}\n" ) body_html = ( "
" f"

{title}

" f"

{message}

" "" f"" f"" f"" f"" "
Kind{_clean_text(payload.get('kind'))}
Status{_clean_text(payload.get('status'))}
Priority{_clean_text(payload.get('priority'))}
Requested by{_clean_text(payload.get('requested_by'))}
" f"Open portal item" "
" ) deliveries: list[Dict[str, Any]] = [] for recipient in recipients: try: result = await send_generic_email( recipient_email=recipient, subject=title, body_text=body_text, body_html=body_html, ) deliveries.append({"recipient": recipient, "status": "ok", **result}) except Exception as exc: deliveries.append({"recipient": recipient, "status": "error", "detail": str(exc)}) successful = [entry for entry in deliveries if entry.get("status") == "ok"] if successful: return {"status": "ok", "detail": f"Email sent to {len(successful)} recipient(s).", "deliveries": deliveries} return {"status": "error", "detail": "Email delivery failed for all recipients.", "deliveries": deliveries} async def send_portal_notification( *, event_type: str, item: Dict[str, Any], actor_username: str, actor_role: str, note: Optional[str] = None, ) -> Dict[str, Any]: runtime = get_runtime_settings() if not runtime.magent_notify_enabled: return {"status": "skipped", "detail": "Notifications are disabled.", "channels": {}} item_id = int(item.get("id") or 0) title = f"{env_settings.app_name} portal update: {item.get('title') or f'Item #{item_id}'}" message_lines = [ f"Event: {event_type}", f"Actor: {actor_username} ({actor_role})", f"Item #{item_id} is now '{_clean_text(item.get('status'), 'unknown')}'.", ] if note: message_lines.append(f"Note: {note}") message_lines.append(f"Open: {_portal_item_url(item_id)}") message = "\n".join(message_lines) payload = { "type": "portal.notification", "event": event_type, "item_id": item_id, "item_url": _portal_item_url(item_id), "kind": _clean_text(item.get("kind")), "status": _clean_text(item.get("status")), "priority": _clean_text(item.get("priority")), "requested_by": _clean_text(item.get("created_by_username")), "actor_username": actor_username, "actor_role": actor_role, "note": note or "", } channels: Dict[str, Dict[str, Any]] = {} if runtime.magent_notify_discord_enabled: try: channels["discord"] = await _send_discord(title, message, payload) except Exception as exc: channels["discord"] = {"status": "error", "detail": str(exc)} if runtime.magent_notify_telegram_enabled: try: channels["telegram"] = await _send_telegram(title, message) except Exception as exc: channels["telegram"] = {"status": "error", "detail": str(exc)} if runtime.magent_notify_webhook_enabled: try: channels["webhook"] = await _send_webhook(payload) except Exception as exc: channels["webhook"] = {"status": "error", "detail": str(exc)} if runtime.magent_notify_push_enabled: try: channels["push"] = await _send_push(title, message, payload) except Exception as exc: channels["push"] = {"status": "error", "detail": str(exc)} if runtime.magent_notify_email_enabled: try: channels["email"] = await _send_email(title, message, payload) except Exception as exc: channels["email"] = {"status": "error", "detail": str(exc)} successful = [name for name, value in channels.items() if value.get("status") == "ok"] failed = [name for name, value in channels.items() if value.get("status") == "error"] skipped = [name for name, value in channels.items() if value.get("status") == "skipped"] logger.info( "portal notification event=%s item_id=%s successful=%s failed=%s skipped=%s", event_type, item_id, successful, failed, skipped, ) overall = "ok" if successful and not failed else "error" if failed and not successful else "partial" if not channels: overall = "skipped" return {"status": overall, "channels": channels}