Files
Magent/backend/app/services/invite_email.py

464 lines
17 KiB
Python

from __future__ import annotations
import asyncio
import html
import json
import logging
import re
import smtplib
from email.message import EmailMessage
from email.utils import formataddr
from typing import Any, Dict, Optional
from ..build_info import BUILD_NUMBER
from ..config import settings as env_settings
from ..db import delete_setting, get_setting, set_setting
from ..runtime import get_runtime_settings
logger = logging.getLogger(__name__)
TEMPLATE_SETTING_PREFIX = "invite_email_template_"
TEMPLATE_KEYS = ("invited", "welcome", "warning", "banned")
EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PLACEHOLDER_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_]+)\s*}}")
TEMPLATE_METADATA: Dict[str, Dict[str, Any]] = {
"invited": {
"label": "You have been invited",
"description": "Sent when an invite link is created and emailed to a recipient.",
},
"welcome": {
"label": "Welcome / How it works",
"description": "Sent after an invited user completes signup.",
},
"warning": {
"label": "Warning",
"description": "Manual warning template for account or behavior notices.",
},
"banned": {
"label": "Banned",
"description": "Sent when an account is banned or removed.",
},
}
TEMPLATE_PLACEHOLDERS = [
"app_name",
"app_url",
"build_number",
"how_it_works_url",
"invite_code",
"invite_description",
"invite_expires_at",
"invite_label",
"invite_link",
"invite_remaining_uses",
"inviter_username",
"message",
"reason",
"recipient_email",
"role",
"username",
]
DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
"invited": {
"subject": "{{app_name}} invite for {{recipient_email}}",
"body_text": (
"You have been invited to {{app_name}}.\n\n"
"Invite code: {{invite_code}}\n"
"Signup link: {{invite_link}}\n"
"Invited by: {{inviter_username}}\n"
"Invite label: {{invite_label}}\n"
"Expires: {{invite_expires_at}}\n"
"Remaining uses: {{invite_remaining_uses}}\n\n"
"{{invite_description}}\n\n"
"{{message}}\n\n"
"How it works: {{how_it_works_url}}\n"
"Build: {{build_number}}\n"
),
"body_html": (
"<h1>You have been invited</h1>"
"<p>You have been invited to <strong>{{app_name}}</strong>.</p>"
"<p><strong>Invite code:</strong> {{invite_code}}<br />"
"<strong>Invited by:</strong> {{inviter_username}}<br />"
"<strong>Invite label:</strong> {{invite_label}}<br />"
"<strong>Expires:</strong> {{invite_expires_at}}<br />"
"<strong>Remaining uses:</strong> {{invite_remaining_uses}}</p>"
"<p>{{invite_description}}</p>"
"<p>{{message}}</p>"
"<p><a href=\"{{invite_link}}\">Accept invite and create account</a></p>"
"<p><a href=\"{{how_it_works_url}}\">How it works</a></p>"
"<p class=\"meta\">Build {{build_number}}</p>"
),
},
"welcome": {
"subject": "Welcome to {{app_name}}",
"body_text": (
"Welcome to {{app_name}}, {{username}}.\n\n"
"Your account is ready.\n"
"Open: {{app_url}}\n"
"How it works: {{how_it_works_url}}\n"
"Role: {{role}}\n\n"
"{{message}}\n"
),
"body_html": (
"<h1>Welcome</h1>"
"<p>Your {{app_name}} account is ready, <strong>{{username}}</strong>.</p>"
"<p><strong>Role:</strong> {{role}}</p>"
"<p><a href=\"{{app_url}}\">Open {{app_name}}</a><br />"
"<a href=\"{{how_it_works_url}}\">Read how it works</a></p>"
"<p>{{message}}</p>"
),
},
"warning": {
"subject": "{{app_name}} account warning",
"body_text": (
"Hello {{username}},\n\n"
"This is a warning regarding your {{app_name}} account.\n\n"
"Reason: {{reason}}\n\n"
"{{message}}\n\n"
"If you need help, contact the admin.\n"
),
"body_html": (
"<h1>Account warning</h1>"
"<p>Hello <strong>{{username}}</strong>,</p>"
"<p>This is a warning regarding your {{app_name}} account.</p>"
"<p><strong>Reason:</strong> {{reason}}</p>"
"<p>{{message}}</p>"
"<p>If you need help, contact the admin.</p>"
),
},
"banned": {
"subject": "{{app_name}} account status changed",
"body_text": (
"Hello {{username}},\n\n"
"Your {{app_name}} account has been banned or removed.\n\n"
"Reason: {{reason}}\n\n"
"{{message}}\n"
),
"body_html": (
"<h1>Account status changed</h1>"
"<p>Hello <strong>{{username}}</strong>,</p>"
"<p>Your {{app_name}} account has been banned or removed.</p>"
"<p><strong>Reason:</strong> {{reason}}</p>"
"<p>{{message}}</p>"
),
},
}
def _template_setting_key(template_key: str) -> str:
return f"{TEMPLATE_SETTING_PREFIX}{template_key}"
def _is_valid_email(value: object) -> bool:
if not isinstance(value, str):
return False
candidate = value.strip()
if not candidate:
return False
return bool(EMAIL_PATTERN.match(candidate))
def _normalize_email(value: object) -> Optional[str]:
if not _is_valid_email(value):
return None
return str(value).strip()
def _normalize_display_text(value: object, fallback: str = "") -> str:
if value is None:
return fallback
if isinstance(value, str):
trimmed = value.strip()
return trimmed if trimmed else fallback
return str(value)
def _template_context_value(value: object, fallback: str = "") -> str:
if value is None:
return fallback
if isinstance(value, str):
return value.strip()
return str(value)
def _safe_template_context(context: Dict[str, object]) -> Dict[str, str]:
safe: Dict[str, str] = {}
for key in TEMPLATE_PLACEHOLDERS:
safe[key] = _template_context_value(context.get(key), "")
return safe
def _render_template_string(template: str, context: Dict[str, str], *, escape_html: bool = False) -> str:
if not isinstance(template, str):
return ""
def _replace(match: re.Match[str]) -> str:
key = match.group(1)
value = context.get(key, "")
return html.escape(value) if escape_html else value
return PLACEHOLDER_PATTERN.sub(_replace, template)
def _strip_html_for_text(value: str) -> str:
text = re.sub(r"<br\s*/?>", "\n", value, flags=re.IGNORECASE)
text = re.sub(r"</p>", "\n\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text).strip()
def _build_default_base_url() -> str:
runtime = get_runtime_settings()
for candidate in (
runtime.magent_application_url,
runtime.magent_proxy_base_url,
env_settings.cors_allow_origin,
):
normalized = _normalize_display_text(candidate)
if normalized:
return normalized.rstrip("/")
port = int(getattr(runtime, "magent_application_port", 3000) or 3000)
return f"http://localhost:{port}"
def build_invite_email_context(
*,
invite: Optional[Dict[str, Any]] = None,
user: Optional[Dict[str, Any]] = None,
recipient_email: Optional[str] = None,
message: Optional[str] = None,
reason: Optional[str] = None,
overrides: Optional[Dict[str, object]] = None,
) -> Dict[str, str]:
app_url = _build_default_base_url()
invite_code = _normalize_display_text(invite.get("code") if invite else None, "Not set")
invite_link = f"{app_url}/signup?code={invite_code}" if invite_code != "Not set" else f"{app_url}/signup"
remaining_uses = invite.get("remaining_uses") if invite else None
resolved_recipient = _normalize_email(recipient_email)
if not resolved_recipient and invite:
resolved_recipient = _normalize_email(invite.get("recipient_email"))
if not resolved_recipient and user:
resolved_recipient = resolve_user_delivery_email(user)
context: Dict[str, object] = {
"app_name": env_settings.app_name,
"app_url": app_url,
"build_number": BUILD_NUMBER,
"how_it_works_url": f"{app_url}/how-it-works",
"invite_code": invite_code,
"invite_description": _normalize_display_text(invite.get("description") if invite else None, "No extra details."),
"invite_expires_at": _normalize_display_text(invite.get("expires_at") if invite else None, "Never"),
"invite_label": _normalize_display_text(invite.get("label") if invite else None, "No label"),
"invite_link": invite_link,
"invite_remaining_uses": (
"Unlimited" if remaining_uses in (None, "") else _normalize_display_text(remaining_uses)
),
"inviter_username": _normalize_display_text(
invite.get("created_by") if invite else (user.get("username") if user else None),
"Admin",
),
"message": _normalize_display_text(message, ""),
"reason": _normalize_display_text(reason, "Not specified"),
"recipient_email": _normalize_display_text(resolved_recipient, "No email supplied"),
"role": _normalize_display_text(user.get("role") if user else None, "user"),
"username": _normalize_display_text(user.get("username") if user else None, "there"),
}
if isinstance(overrides, dict):
context.update(overrides)
return _safe_template_context(context)
def get_invite_email_templates() -> Dict[str, Dict[str, Any]]:
templates: Dict[str, Dict[str, Any]] = {}
for template_key in TEMPLATE_KEYS:
template = dict(DEFAULT_TEMPLATES[template_key])
raw_value = get_setting(_template_setting_key(template_key))
if raw_value:
try:
stored = json.loads(raw_value)
except (TypeError, json.JSONDecodeError):
stored = {}
if isinstance(stored, dict):
for field in ("subject", "body_text", "body_html"):
if isinstance(stored.get(field), str):
template[field] = stored[field]
templates[template_key] = {
"key": template_key,
"label": TEMPLATE_METADATA[template_key]["label"],
"description": TEMPLATE_METADATA[template_key]["description"],
"placeholders": TEMPLATE_PLACEHOLDERS,
**template,
}
return templates
def get_invite_email_template(template_key: str) -> Dict[str, Any]:
if template_key not in TEMPLATE_KEYS:
raise ValueError(f"Unknown email template: {template_key}")
return get_invite_email_templates()[template_key]
def save_invite_email_template(
template_key: str,
*,
subject: str,
body_text: str,
body_html: str,
) -> Dict[str, Any]:
if template_key not in TEMPLATE_KEYS:
raise ValueError(f"Unknown email template: {template_key}")
payload = {
"subject": subject,
"body_text": body_text,
"body_html": body_html,
}
set_setting(_template_setting_key(template_key), json.dumps(payload))
return get_invite_email_template(template_key)
def reset_invite_email_template(template_key: str) -> Dict[str, Any]:
if template_key not in TEMPLATE_KEYS:
raise ValueError(f"Unknown email template: {template_key}")
delete_setting(_template_setting_key(template_key))
return get_invite_email_template(template_key)
def render_invite_email_template(
template_key: str,
*,
invite: Optional[Dict[str, Any]] = None,
user: Optional[Dict[str, Any]] = None,
recipient_email: Optional[str] = None,
message: Optional[str] = None,
reason: Optional[str] = None,
overrides: Optional[Dict[str, object]] = None,
) -> Dict[str, str]:
template = get_invite_email_template(template_key)
context = build_invite_email_context(
invite=invite,
user=user,
recipient_email=recipient_email,
message=message,
reason=reason,
overrides=overrides,
)
body_html = _render_template_string(template["body_html"], context, escape_html=True)
body_text = _render_template_string(template["body_text"], context, escape_html=False)
if not body_text.strip() and body_html.strip():
body_text = _strip_html_for_text(body_html)
subject = _render_template_string(template["subject"], context, escape_html=False)
return {
"subject": subject.strip(),
"body_text": body_text.strip(),
"body_html": body_html.strip(),
}
def resolve_user_delivery_email(user: Optional[Dict[str, Any]], invite: Optional[Dict[str, Any]] = None) -> Optional[str]:
if not isinstance(user, dict):
return _normalize_email(invite.get("recipient_email") if isinstance(invite, dict) else None)
username_email = _normalize_email(user.get("username"))
if username_email:
return username_email
if isinstance(invite, dict):
invite_email = _normalize_email(invite.get("recipient_email"))
if invite_email:
return invite_email
return None
def smtp_email_config_ready() -> tuple[bool, str]:
runtime = get_runtime_settings()
if not runtime.magent_notify_enabled:
return False, "Notifications are disabled."
if not runtime.magent_notify_email_enabled:
return False, "Email notifications are disabled."
if not _normalize_display_text(runtime.magent_notify_email_smtp_host):
return False, "SMTP host is not configured."
if not _normalize_email(runtime.magent_notify_email_from_address):
return False, "From email address is not configured."
return True, "ok"
def _send_email_sync(*, recipient_email: str, subject: str, body_text: str, body_html: str) -> None:
runtime = get_runtime_settings()
host = _normalize_display_text(runtime.magent_notify_email_smtp_host)
port = int(runtime.magent_notify_email_smtp_port or 587)
username = _normalize_display_text(runtime.magent_notify_email_smtp_username)
password = _normalize_display_text(runtime.magent_notify_email_smtp_password)
from_address = _normalize_email(runtime.magent_notify_email_from_address)
from_name = _normalize_display_text(runtime.magent_notify_email_from_name, env_settings.app_name)
use_tls = bool(runtime.magent_notify_email_use_tls)
use_ssl = bool(runtime.magent_notify_email_use_ssl)
if not host or not from_address:
raise RuntimeError("SMTP email settings are incomplete.")
message = EmailMessage()
message["Subject"] = subject
message["From"] = formataddr((from_name, from_address))
message["To"] = recipient_email
message.set_content(body_text or _strip_html_for_text(body_html))
if body_html.strip():
message.add_alternative(body_html, subtype="html")
if use_ssl:
with smtplib.SMTP_SSL(host, port, timeout=20) as smtp:
if username and password:
smtp.login(username, password)
smtp.send_message(message)
return
with smtplib.SMTP(host, port, timeout=20) as smtp:
smtp.ehlo()
if use_tls:
smtp.starttls()
smtp.ehlo()
if username and password:
smtp.login(username, password)
smtp.send_message(message)
async def send_templated_email(
template_key: str,
*,
invite: Optional[Dict[str, Any]] = None,
user: Optional[Dict[str, Any]] = None,
recipient_email: Optional[str] = None,
message: Optional[str] = None,
reason: Optional[str] = None,
overrides: Optional[Dict[str, object]] = None,
) -> Dict[str, str]:
ready, detail = smtp_email_config_ready()
if not ready:
raise RuntimeError(detail)
resolved_email = _normalize_email(recipient_email)
if not resolved_email:
resolved_email = resolve_user_delivery_email(user, invite)
if not resolved_email:
raise RuntimeError("No valid recipient email is available for this action.")
rendered = render_invite_email_template(
template_key,
invite=invite,
user=user,
recipient_email=resolved_email,
message=message,
reason=reason,
overrides=overrides,
)
await asyncio.to_thread(
_send_email_sync,
recipient_email=resolved_email,
subject=rendered["subject"],
body_text=rendered["body_text"],
body_html=rendered["body_html"],
)
logger.info("Email template sent: template=%s recipient=%s", template_key, resolved_email)
return {
"recipient_email": resolved_email,
"subject": rendered["subject"],
}