540 lines
20 KiB
Python
540 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import html
|
|
import json
|
|
import logging
|
|
import re
|
|
import smtplib
|
|
from email.message import EmailMessage
|
|
from email.utils import formataddr
|
|
from typing import Any, Dict, Optional
|
|
|
|
from ..build_info import BUILD_NUMBER
|
|
from ..config import settings as env_settings
|
|
from ..db import delete_setting, get_setting, set_setting
|
|
from ..runtime import get_runtime_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TEMPLATE_SETTING_PREFIX = "invite_email_template_"
|
|
TEMPLATE_KEYS = ("invited", "welcome", "warning", "banned")
|
|
EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
|
PLACEHOLDER_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_]+)\s*}}")
|
|
|
|
TEMPLATE_METADATA: Dict[str, Dict[str, Any]] = {
|
|
"invited": {
|
|
"label": "You have been invited",
|
|
"description": "Sent when an invite link is created and emailed to a recipient.",
|
|
},
|
|
"welcome": {
|
|
"label": "Welcome / How it works",
|
|
"description": "Sent after an invited user completes signup.",
|
|
},
|
|
"warning": {
|
|
"label": "Warning",
|
|
"description": "Manual warning template for account or behavior notices.",
|
|
},
|
|
"banned": {
|
|
"label": "Banned",
|
|
"description": "Sent when an account is banned or removed.",
|
|
},
|
|
}
|
|
|
|
TEMPLATE_PLACEHOLDERS = [
|
|
"app_name",
|
|
"app_url",
|
|
"build_number",
|
|
"how_it_works_url",
|
|
"invite_code",
|
|
"invite_description",
|
|
"invite_expires_at",
|
|
"invite_label",
|
|
"invite_link",
|
|
"invite_remaining_uses",
|
|
"inviter_username",
|
|
"message",
|
|
"reason",
|
|
"recipient_email",
|
|
"role",
|
|
"username",
|
|
]
|
|
|
|
DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
|
|
"invited": {
|
|
"subject": "{{app_name}} invite for {{recipient_email}}",
|
|
"body_text": (
|
|
"You have been invited to {{app_name}}.\n\n"
|
|
"Invite code: {{invite_code}}\n"
|
|
"Signup link: {{invite_link}}\n"
|
|
"Invited by: {{inviter_username}}\n"
|
|
"Invite label: {{invite_label}}\n"
|
|
"Expires: {{invite_expires_at}}\n"
|
|
"Remaining uses: {{invite_remaining_uses}}\n\n"
|
|
"{{invite_description}}\n\n"
|
|
"{{message}}\n\n"
|
|
"How it works: {{how_it_works_url}}\n"
|
|
"Build: {{build_number}}\n"
|
|
),
|
|
"body_html": (
|
|
"<h1>You have been invited</h1>"
|
|
"<p>You have been invited to <strong>{{app_name}}</strong>.</p>"
|
|
"<p><strong>Invite code:</strong> {{invite_code}}<br />"
|
|
"<strong>Invited by:</strong> {{inviter_username}}<br />"
|
|
"<strong>Invite label:</strong> {{invite_label}}<br />"
|
|
"<strong>Expires:</strong> {{invite_expires_at}}<br />"
|
|
"<strong>Remaining uses:</strong> {{invite_remaining_uses}}</p>"
|
|
"<p>{{invite_description}}</p>"
|
|
"<p>{{message}}</p>"
|
|
"<p><a href=\"{{invite_link}}\">Accept invite and create account</a></p>"
|
|
"<p><a href=\"{{how_it_works_url}}\">How it works</a></p>"
|
|
"<p class=\"meta\">Build {{build_number}}</p>"
|
|
),
|
|
},
|
|
"welcome": {
|
|
"subject": "Welcome to {{app_name}}",
|
|
"body_text": (
|
|
"Welcome to {{app_name}}, {{username}}.\n\n"
|
|
"Your account is ready.\n"
|
|
"Open: {{app_url}}\n"
|
|
"How it works: {{how_it_works_url}}\n"
|
|
"Role: {{role}}\n\n"
|
|
"{{message}}\n"
|
|
),
|
|
"body_html": (
|
|
"<h1>Welcome</h1>"
|
|
"<p>Your {{app_name}} account is ready, <strong>{{username}}</strong>.</p>"
|
|
"<p><strong>Role:</strong> {{role}}</p>"
|
|
"<p><a href=\"{{app_url}}\">Open {{app_name}}</a><br />"
|
|
"<a href=\"{{how_it_works_url}}\">Read how it works</a></p>"
|
|
"<p>{{message}}</p>"
|
|
),
|
|
},
|
|
"warning": {
|
|
"subject": "{{app_name}} account warning",
|
|
"body_text": (
|
|
"Hello {{username}},\n\n"
|
|
"This is a warning regarding your {{app_name}} account.\n\n"
|
|
"Reason: {{reason}}\n\n"
|
|
"{{message}}\n\n"
|
|
"If you need help, contact the admin.\n"
|
|
),
|
|
"body_html": (
|
|
"<h1>Account warning</h1>"
|
|
"<p>Hello <strong>{{username}}</strong>,</p>"
|
|
"<p>This is a warning regarding your {{app_name}} account.</p>"
|
|
"<p><strong>Reason:</strong> {{reason}}</p>"
|
|
"<p>{{message}}</p>"
|
|
"<p>If you need help, contact the admin.</p>"
|
|
),
|
|
},
|
|
"banned": {
|
|
"subject": "{{app_name}} account status changed",
|
|
"body_text": (
|
|
"Hello {{username}},\n\n"
|
|
"Your {{app_name}} account has been banned or removed.\n\n"
|
|
"Reason: {{reason}}\n\n"
|
|
"{{message}}\n"
|
|
),
|
|
"body_html": (
|
|
"<h1>Account status changed</h1>"
|
|
"<p>Hello <strong>{{username}}</strong>,</p>"
|
|
"<p>Your {{app_name}} account has been banned or removed.</p>"
|
|
"<p><strong>Reason:</strong> {{reason}}</p>"
|
|
"<p>{{message}}</p>"
|
|
),
|
|
},
|
|
}
|
|
|
|
|
|
def _template_setting_key(template_key: str) -> str:
|
|
return f"{TEMPLATE_SETTING_PREFIX}{template_key}"
|
|
|
|
|
|
def _is_valid_email(value: object) -> bool:
|
|
if not isinstance(value, str):
|
|
return False
|
|
candidate = value.strip()
|
|
if not candidate:
|
|
return False
|
|
return bool(EMAIL_PATTERN.match(candidate))
|
|
|
|
|
|
def _normalize_email(value: object) -> Optional[str]:
|
|
if not _is_valid_email(value):
|
|
return None
|
|
return str(value).strip()
|
|
|
|
|
|
def _normalize_display_text(value: object, fallback: str = "") -> str:
|
|
if value is None:
|
|
return fallback
|
|
if isinstance(value, str):
|
|
trimmed = value.strip()
|
|
return trimmed if trimmed else fallback
|
|
return str(value)
|
|
|
|
|
|
def _template_context_value(value: object, fallback: str = "") -> str:
|
|
if value is None:
|
|
return fallback
|
|
if isinstance(value, str):
|
|
return value.strip()
|
|
return str(value)
|
|
|
|
|
|
def _safe_template_context(context: Dict[str, object]) -> Dict[str, str]:
|
|
safe: Dict[str, str] = {}
|
|
for key in TEMPLATE_PLACEHOLDERS:
|
|
safe[key] = _template_context_value(context.get(key), "")
|
|
return safe
|
|
|
|
|
|
def _render_template_string(template: str, context: Dict[str, str], *, escape_html: bool = False) -> str:
|
|
if not isinstance(template, str):
|
|
return ""
|
|
|
|
def _replace(match: re.Match[str]) -> str:
|
|
key = match.group(1)
|
|
value = context.get(key, "")
|
|
return html.escape(value) if escape_html else value
|
|
|
|
return PLACEHOLDER_PATTERN.sub(_replace, template)
|
|
|
|
|
|
def _strip_html_for_text(value: str) -> str:
|
|
text = re.sub(r"<br\s*/?>", "\n", value, flags=re.IGNORECASE)
|
|
text = re.sub(r"</p>", "\n\n", text, flags=re.IGNORECASE)
|
|
text = re.sub(r"<[^>]+>", "", text)
|
|
return html.unescape(text).strip()
|
|
|
|
|
|
def _build_default_base_url() -> str:
|
|
runtime = get_runtime_settings()
|
|
for candidate in (
|
|
runtime.magent_application_url,
|
|
runtime.magent_proxy_base_url,
|
|
env_settings.cors_allow_origin,
|
|
):
|
|
normalized = _normalize_display_text(candidate)
|
|
if normalized:
|
|
return normalized.rstrip("/")
|
|
port = int(getattr(runtime, "magent_application_port", 3000) or 3000)
|
|
return f"http://localhost:{port}"
|
|
|
|
|
|
def build_invite_email_context(
|
|
*,
|
|
invite: Optional[Dict[str, Any]] = None,
|
|
user: Optional[Dict[str, Any]] = None,
|
|
recipient_email: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
reason: Optional[str] = None,
|
|
overrides: Optional[Dict[str, object]] = None,
|
|
) -> Dict[str, str]:
|
|
app_url = _build_default_base_url()
|
|
invite_code = _normalize_display_text(invite.get("code") if invite else None, "Not set")
|
|
invite_link = f"{app_url}/signup?code={invite_code}" if invite_code != "Not set" else f"{app_url}/signup"
|
|
remaining_uses = invite.get("remaining_uses") if invite else None
|
|
resolved_recipient = _normalize_email(recipient_email)
|
|
if not resolved_recipient and invite:
|
|
resolved_recipient = _normalize_email(invite.get("recipient_email"))
|
|
if not resolved_recipient and user:
|
|
resolved_recipient = resolve_user_delivery_email(user)
|
|
|
|
context: Dict[str, object] = {
|
|
"app_name": env_settings.app_name,
|
|
"app_url": app_url,
|
|
"build_number": BUILD_NUMBER,
|
|
"how_it_works_url": f"{app_url}/how-it-works",
|
|
"invite_code": invite_code,
|
|
"invite_description": _normalize_display_text(invite.get("description") if invite else None, "No extra details."),
|
|
"invite_expires_at": _normalize_display_text(invite.get("expires_at") if invite else None, "Never"),
|
|
"invite_label": _normalize_display_text(invite.get("label") if invite else None, "No label"),
|
|
"invite_link": invite_link,
|
|
"invite_remaining_uses": (
|
|
"Unlimited" if remaining_uses in (None, "") else _normalize_display_text(remaining_uses)
|
|
),
|
|
"inviter_username": _normalize_display_text(
|
|
invite.get("created_by") if invite else (user.get("username") if user else None),
|
|
"Admin",
|
|
),
|
|
"message": _normalize_display_text(message, ""),
|
|
"reason": _normalize_display_text(reason, "Not specified"),
|
|
"recipient_email": _normalize_display_text(resolved_recipient, "No email supplied"),
|
|
"role": _normalize_display_text(user.get("role") if user else None, "user"),
|
|
"username": _normalize_display_text(user.get("username") if user else None, "there"),
|
|
}
|
|
if isinstance(overrides, dict):
|
|
context.update(overrides)
|
|
return _safe_template_context(context)
|
|
|
|
|
|
def get_invite_email_templates() -> Dict[str, Dict[str, Any]]:
|
|
templates: Dict[str, Dict[str, Any]] = {}
|
|
for template_key in TEMPLATE_KEYS:
|
|
template = dict(DEFAULT_TEMPLATES[template_key])
|
|
raw_value = get_setting(_template_setting_key(template_key))
|
|
if raw_value:
|
|
try:
|
|
stored = json.loads(raw_value)
|
|
except (TypeError, json.JSONDecodeError):
|
|
stored = {}
|
|
if isinstance(stored, dict):
|
|
for field in ("subject", "body_text", "body_html"):
|
|
if isinstance(stored.get(field), str):
|
|
template[field] = stored[field]
|
|
templates[template_key] = {
|
|
"key": template_key,
|
|
"label": TEMPLATE_METADATA[template_key]["label"],
|
|
"description": TEMPLATE_METADATA[template_key]["description"],
|
|
"placeholders": TEMPLATE_PLACEHOLDERS,
|
|
**template,
|
|
}
|
|
return templates
|
|
|
|
|
|
def get_invite_email_template(template_key: str) -> Dict[str, Any]:
|
|
if template_key not in TEMPLATE_KEYS:
|
|
raise ValueError(f"Unknown email template: {template_key}")
|
|
return get_invite_email_templates()[template_key]
|
|
|
|
|
|
def save_invite_email_template(
|
|
template_key: str,
|
|
*,
|
|
subject: str,
|
|
body_text: str,
|
|
body_html: str,
|
|
) -> Dict[str, Any]:
|
|
if template_key not in TEMPLATE_KEYS:
|
|
raise ValueError(f"Unknown email template: {template_key}")
|
|
payload = {
|
|
"subject": subject,
|
|
"body_text": body_text,
|
|
"body_html": body_html,
|
|
}
|
|
set_setting(_template_setting_key(template_key), json.dumps(payload))
|
|
return get_invite_email_template(template_key)
|
|
|
|
|
|
def reset_invite_email_template(template_key: str) -> Dict[str, Any]:
|
|
if template_key not in TEMPLATE_KEYS:
|
|
raise ValueError(f"Unknown email template: {template_key}")
|
|
delete_setting(_template_setting_key(template_key))
|
|
return get_invite_email_template(template_key)
|
|
|
|
|
|
def render_invite_email_template(
|
|
template_key: str,
|
|
*,
|
|
invite: Optional[Dict[str, Any]] = None,
|
|
user: Optional[Dict[str, Any]] = None,
|
|
recipient_email: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
reason: Optional[str] = None,
|
|
overrides: Optional[Dict[str, object]] = None,
|
|
) -> Dict[str, str]:
|
|
template = get_invite_email_template(template_key)
|
|
context = build_invite_email_context(
|
|
invite=invite,
|
|
user=user,
|
|
recipient_email=recipient_email,
|
|
message=message,
|
|
reason=reason,
|
|
overrides=overrides,
|
|
)
|
|
body_html = _render_template_string(template["body_html"], context, escape_html=True)
|
|
body_text = _render_template_string(template["body_text"], context, escape_html=False)
|
|
if not body_text.strip() and body_html.strip():
|
|
body_text = _strip_html_for_text(body_html)
|
|
subject = _render_template_string(template["subject"], context, escape_html=False)
|
|
return {
|
|
"subject": subject.strip(),
|
|
"body_text": body_text.strip(),
|
|
"body_html": body_html.strip(),
|
|
}
|
|
|
|
|
|
def resolve_user_delivery_email(user: Optional[Dict[str, Any]], invite: Optional[Dict[str, Any]] = None) -> Optional[str]:
|
|
if not isinstance(user, dict):
|
|
return _normalize_email(invite.get("recipient_email") if isinstance(invite, dict) else None)
|
|
username_email = _normalize_email(user.get("username"))
|
|
if username_email:
|
|
return username_email
|
|
if isinstance(invite, dict):
|
|
invite_email = _normalize_email(invite.get("recipient_email"))
|
|
if invite_email:
|
|
return invite_email
|
|
return None
|
|
|
|
|
|
def smtp_email_config_ready() -> tuple[bool, str]:
|
|
runtime = get_runtime_settings()
|
|
if not runtime.magent_notify_enabled:
|
|
return False, "Notifications are disabled."
|
|
if not runtime.magent_notify_email_enabled:
|
|
return False, "Email notifications are disabled."
|
|
if not _normalize_display_text(runtime.magent_notify_email_smtp_host):
|
|
return False, "SMTP host is not configured."
|
|
if not _normalize_email(runtime.magent_notify_email_from_address):
|
|
return False, "From email address is not configured."
|
|
return True, "ok"
|
|
|
|
|
|
def smtp_email_delivery_warning() -> Optional[str]:
|
|
runtime = get_runtime_settings()
|
|
host = _normalize_display_text(runtime.magent_notify_email_smtp_host).lower()
|
|
username = _normalize_display_text(runtime.magent_notify_email_smtp_username)
|
|
password = _normalize_display_text(runtime.magent_notify_email_smtp_password)
|
|
if host.endswith(".mail.protection.outlook.com") and not (username and password):
|
|
return (
|
|
"Unauthenticated Microsoft 365 relay mode is configured. SMTP acceptance does not "
|
|
"confirm mailbox delivery. For reliable delivery, use smtp.office365.com:587 with "
|
|
"SMTP credentials or configure a verified Exchange relay connector."
|
|
)
|
|
return None
|
|
|
|
|
|
def _send_email_sync(*, recipient_email: str, subject: str, body_text: str, body_html: str) -> None:
|
|
runtime = get_runtime_settings()
|
|
host = _normalize_display_text(runtime.magent_notify_email_smtp_host)
|
|
port = int(runtime.magent_notify_email_smtp_port or 587)
|
|
username = _normalize_display_text(runtime.magent_notify_email_smtp_username)
|
|
password = _normalize_display_text(runtime.magent_notify_email_smtp_password)
|
|
from_address = _normalize_email(runtime.magent_notify_email_from_address)
|
|
from_name = _normalize_display_text(runtime.magent_notify_email_from_name, env_settings.app_name)
|
|
use_tls = bool(runtime.magent_notify_email_use_tls)
|
|
use_ssl = bool(runtime.magent_notify_email_use_ssl)
|
|
delivery_warning = smtp_email_delivery_warning()
|
|
if not host or not from_address:
|
|
raise RuntimeError("SMTP email settings are incomplete.")
|
|
logger.info(
|
|
"smtp send started recipient=%s from=%s host=%s port=%s tls=%s ssl=%s auth=%s subject=%s",
|
|
recipient_email,
|
|
from_address,
|
|
host,
|
|
port,
|
|
use_tls,
|
|
use_ssl,
|
|
bool(username and password),
|
|
subject,
|
|
)
|
|
if delivery_warning:
|
|
logger.warning("smtp delivery warning host=%s detail=%s", host, delivery_warning)
|
|
|
|
message = EmailMessage()
|
|
message["Subject"] = subject
|
|
message["From"] = formataddr((from_name, from_address))
|
|
message["To"] = recipient_email
|
|
message.set_content(body_text or _strip_html_for_text(body_html))
|
|
if body_html.strip():
|
|
message.add_alternative(body_html, subtype="html")
|
|
|
|
if use_ssl:
|
|
with smtplib.SMTP_SSL(host, port, timeout=20) as smtp:
|
|
logger.debug("smtp ssl connection opened host=%s port=%s", host, port)
|
|
if username and password:
|
|
smtp.login(username, password)
|
|
logger.debug("smtp login succeeded host=%s username=%s", host, username)
|
|
smtp.send_message(message)
|
|
logger.info("smtp send accepted recipient=%s host=%s mode=ssl", recipient_email, host)
|
|
return
|
|
|
|
with smtplib.SMTP(host, port, timeout=20) as smtp:
|
|
logger.debug("smtp connection opened host=%s port=%s", host, port)
|
|
smtp.ehlo()
|
|
if use_tls:
|
|
smtp.starttls()
|
|
smtp.ehlo()
|
|
logger.debug("smtp starttls negotiated host=%s port=%s", host, port)
|
|
if username and password:
|
|
smtp.login(username, password)
|
|
logger.debug("smtp login succeeded host=%s username=%s", host, username)
|
|
smtp.send_message(message)
|
|
logger.info("smtp send accepted recipient=%s host=%s mode=plain", recipient_email, host)
|
|
|
|
|
|
async def send_templated_email(
|
|
template_key: str,
|
|
*,
|
|
invite: Optional[Dict[str, Any]] = None,
|
|
user: Optional[Dict[str, Any]] = None,
|
|
recipient_email: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
reason: Optional[str] = None,
|
|
overrides: Optional[Dict[str, object]] = None,
|
|
) -> Dict[str, str]:
|
|
ready, detail = smtp_email_config_ready()
|
|
if not ready:
|
|
raise RuntimeError(detail)
|
|
|
|
resolved_email = _normalize_email(recipient_email)
|
|
if not resolved_email:
|
|
resolved_email = resolve_user_delivery_email(user, invite)
|
|
if not resolved_email:
|
|
raise RuntimeError("No valid recipient email is available for this action.")
|
|
|
|
rendered = render_invite_email_template(
|
|
template_key,
|
|
invite=invite,
|
|
user=user,
|
|
recipient_email=resolved_email,
|
|
message=message,
|
|
reason=reason,
|
|
overrides=overrides,
|
|
)
|
|
await asyncio.to_thread(
|
|
_send_email_sync,
|
|
recipient_email=resolved_email,
|
|
subject=rendered["subject"],
|
|
body_text=rendered["body_text"],
|
|
body_html=rendered["body_html"],
|
|
)
|
|
logger.info("Email template sent: template=%s recipient=%s", template_key, resolved_email)
|
|
return {
|
|
"recipient_email": resolved_email,
|
|
"subject": rendered["subject"],
|
|
}
|
|
|
|
|
|
async def send_test_email(recipient_email: Optional[str] = None) -> Dict[str, str]:
|
|
ready, detail = smtp_email_config_ready()
|
|
if not ready:
|
|
raise RuntimeError(detail)
|
|
|
|
runtime = get_runtime_settings()
|
|
resolved_email = _normalize_email(recipient_email) or _normalize_email(
|
|
runtime.magent_notify_email_from_address
|
|
)
|
|
if not resolved_email:
|
|
raise RuntimeError("No valid recipient email is configured for the test message.")
|
|
|
|
application_url = _normalize_display_text(runtime.magent_application_url, "Not configured")
|
|
subject = f"{env_settings.app_name} email test"
|
|
body_text = (
|
|
f"This is a test email from {env_settings.app_name}.\n\n"
|
|
f"Build: {BUILD_NUMBER}\n"
|
|
f"Application URL: {application_url}\n"
|
|
)
|
|
body_html = (
|
|
f"<h1>{html.escape(env_settings.app_name)} email test</h1>"
|
|
f"<p>This is a test email from <strong>{html.escape(env_settings.app_name)}</strong>.</p>"
|
|
f"<p><strong>Build:</strong> {html.escape(BUILD_NUMBER)}<br />"
|
|
f"<strong>Application URL:</strong> {html.escape(application_url)}</p>"
|
|
)
|
|
|
|
await asyncio.to_thread(
|
|
_send_email_sync,
|
|
recipient_email=resolved_email,
|
|
subject=subject,
|
|
body_text=body_text,
|
|
body_html=body_html,
|
|
)
|
|
logger.info("SMTP test email sent: recipient=%s", resolved_email)
|
|
result = {"recipient_email": resolved_email, "subject": subject}
|
|
warning = smtp_email_delivery_warning()
|
|
if warning:
|
|
result["warning"] = warning
|
|
return result
|