1373 lines
54 KiB
Python
1373 lines
54 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import html
|
|
import json
|
|
import logging
|
|
import re
|
|
import smtplib
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from email.generator import BytesGenerator
|
|
from email.message import EmailMessage
|
|
from email.policy import SMTP as SMTP_POLICY
|
|
from email.utils import formataddr, formatdate, make_msgid
|
|
from io import BytesIO
|
|
from typing import Any, Dict, Optional
|
|
from urllib.parse import urlparse
|
|
|
|
from ..build_info import BUILD_NUMBER
|
|
from ..config import settings as env_settings
|
|
from ..db import delete_setting, get_setting, set_setting
|
|
from ..runtime import get_runtime_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TEMPLATE_SETTING_PREFIX = "invite_email_template_"
|
|
TEMPLATE_KEYS = ("invited", "welcome", "warning", "banned")
|
|
EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
|
PLACEHOLDER_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_]+)\s*}}")
|
|
EXCHANGE_MESSAGE_ID_PATTERN = re.compile(r"<([^>]+)>")
|
|
EXCHANGE_INTERNAL_ID_PATTERN = re.compile(r"\[InternalId=([^\],]+)")
|
|
EMAIL_LOGO_CID = "magent-logo"
|
|
|
|
TEMPLATE_METADATA: Dict[str, Dict[str, Any]] = {
|
|
"invited": {
|
|
"label": "You have been invited",
|
|
"description": "Sent when an invite link is created and emailed to a recipient.",
|
|
},
|
|
"welcome": {
|
|
"label": "Welcome / How it works",
|
|
"description": "Sent after an invited user completes signup.",
|
|
},
|
|
"warning": {
|
|
"label": "Warning",
|
|
"description": "Manual warning template for account or behavior notices.",
|
|
},
|
|
"banned": {
|
|
"label": "Banned",
|
|
"description": "Sent when an account is banned or removed.",
|
|
},
|
|
}
|
|
|
|
TEMPLATE_PLACEHOLDERS = [
|
|
"app_name",
|
|
"app_url",
|
|
"build_number",
|
|
"how_it_works_url",
|
|
"invite_code",
|
|
"invite_description",
|
|
"invite_expires_at",
|
|
"invite_label",
|
|
"invite_link",
|
|
"invite_remaining_uses",
|
|
"inviter_username",
|
|
"message",
|
|
"reason",
|
|
"recipient_email",
|
|
"role",
|
|
"username",
|
|
]
|
|
|
|
EMAIL_TAGLINE = "Find and fix media requests fast."
|
|
|
|
EMAIL_TONE_STYLES: Dict[str, Dict[str, str]] = {
|
|
"brand": {
|
|
"chip_bg": "rgba(255, 107, 43, 0.16)",
|
|
"chip_border": "rgba(255, 107, 43, 0.38)",
|
|
"chip_text": "#ffd2bf",
|
|
"accent_a": "#ff6b2b",
|
|
"accent_b": "#1c6bff",
|
|
},
|
|
"success": {
|
|
"chip_bg": "rgba(34, 197, 94, 0.16)",
|
|
"chip_border": "rgba(34, 197, 94, 0.38)",
|
|
"chip_text": "#c7f9d7",
|
|
"accent_a": "#22c55e",
|
|
"accent_b": "#1c6bff",
|
|
},
|
|
"warning": {
|
|
"chip_bg": "rgba(251, 146, 60, 0.16)",
|
|
"chip_border": "rgba(251, 146, 60, 0.38)",
|
|
"chip_text": "#ffe0ba",
|
|
"accent_a": "#fb923c",
|
|
"accent_b": "#ff6b2b",
|
|
},
|
|
"danger": {
|
|
"chip_bg": "rgba(248, 113, 113, 0.16)",
|
|
"chip_border": "rgba(248, 113, 113, 0.38)",
|
|
"chip_text": "#ffd0d0",
|
|
"accent_a": "#ef4444",
|
|
"accent_b": "#ff6b2b",
|
|
},
|
|
}
|
|
|
|
TEMPLATE_PRESENTATION: Dict[str, Dict[str, str]] = {
|
|
"invited": {
|
|
"tone": "brand",
|
|
"title": "You have been invited",
|
|
"subtitle": "A new account invitation is ready for you.",
|
|
"primary_label": "Accept invite",
|
|
"primary_url_key": "invite_link",
|
|
"secondary_label": "How it works",
|
|
"secondary_url_key": "how_it_works_url",
|
|
},
|
|
"welcome": {
|
|
"tone": "success",
|
|
"title": "Welcome to Magent",
|
|
"subtitle": "Your account is ready and synced.",
|
|
"primary_label": "Open Magent",
|
|
"primary_url_key": "app_url",
|
|
"secondary_label": "How it works",
|
|
"secondary_url_key": "how_it_works_url",
|
|
},
|
|
"warning": {
|
|
"tone": "warning",
|
|
"title": "Account warning",
|
|
"subtitle": "Please review the note below.",
|
|
"primary_label": "Open Magent",
|
|
"primary_url_key": "app_url",
|
|
"secondary_label": "How it works",
|
|
"secondary_url_key": "how_it_works_url",
|
|
},
|
|
"banned": {
|
|
"tone": "danger",
|
|
"title": "Account status changed",
|
|
"subtitle": "Your account has been restricted or removed.",
|
|
"primary_label": "How it works",
|
|
"primary_url_key": "how_it_works_url",
|
|
"secondary_label": "",
|
|
"secondary_url_key": "",
|
|
},
|
|
}
|
|
|
|
|
|
def _build_email_stat_card(label: str, value: str, detail: str = "") -> str:
|
|
detail_html = (
|
|
f"<div style=\"margin-top:8px; font-size:13px; line-height:1.6; color:#5c687d; word-break:break-word;\">"
|
|
f"{html.escape(detail)}</div>"
|
|
if detail
|
|
else ""
|
|
)
|
|
return (
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"border-collapse:separate; background:#f8fafc; border:1px solid #d9e2ef; border-radius:16px;\">"
|
|
"<tr><td style=\"padding:16px; font-family:Segoe UI, Arial, sans-serif; color:#132033;\">"
|
|
f"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#6b778c; margin-bottom:8px;\">"
|
|
f"{html.escape(label)}</div>"
|
|
f"<div style=\"font-size:20px; font-weight:800; line-height:1.45; word-break:break-word; color:#132033;\">"
|
|
f"{html.escape(value)}</div>"
|
|
f"{detail_html}"
|
|
"</td></tr></table>"
|
|
)
|
|
|
|
|
|
def _build_email_stat_grid(cards: list[str]) -> str:
|
|
if not cards:
|
|
return ""
|
|
rows: list[str] = []
|
|
for index in range(0, len(cards), 2):
|
|
left = cards[index]
|
|
right = cards[index + 1] if index + 1 < len(cards) else ""
|
|
rows.append(
|
|
"<tr>"
|
|
f"<td width=\"50%\" style=\"vertical-align:top; padding:0 5px 10px 0;\">{left}</td>"
|
|
f"<td width=\"50%\" style=\"vertical-align:top; padding:0 0 10px 5px;\">{right}</td>"
|
|
"</tr>"
|
|
)
|
|
return (
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"border-collapse:collapse; margin:0 0 18px;\">"
|
|
f"{''.join(rows)}"
|
|
"</table>"
|
|
)
|
|
|
|
|
|
def _build_email_list(items: list[str], *, ordered: bool = False) -> str:
|
|
tag = "ol" if ordered else "ul"
|
|
marker = "padding-left:20px;" if ordered else "padding-left:18px;"
|
|
rendered_items = "".join(
|
|
f"<li style=\"margin:0 0 8px;\">{html.escape(item)}</li>" for item in items if item
|
|
)
|
|
return (
|
|
f"<{tag} style=\"margin:0; {marker} color:#132033; line-height:1.8; font-size:14px;\">"
|
|
f"{rendered_items}"
|
|
f"</{tag}>"
|
|
)
|
|
|
|
|
|
def _build_email_panel(title: str, body_html: str, *, variant: str = "neutral") -> str:
|
|
styles = {
|
|
"neutral": {
|
|
"background": "#f8fafc",
|
|
"border": "#d9e2ef",
|
|
"eyebrow": "#6b778c",
|
|
"text": "#132033",
|
|
},
|
|
"brand": {
|
|
"background": "#eef4ff",
|
|
"border": "#bfd2ff",
|
|
"eyebrow": "#2754b6",
|
|
"text": "#132033",
|
|
},
|
|
"success": {
|
|
"background": "#edf9f0",
|
|
"border": "#bfe4c6",
|
|
"eyebrow": "#1f7a3f",
|
|
"text": "#132033",
|
|
},
|
|
"warning": {
|
|
"background": "#fff5ea",
|
|
"border": "#ffd5a8",
|
|
"eyebrow": "#c46a10",
|
|
"text": "#132033",
|
|
},
|
|
"danger": {
|
|
"background": "#fff0f0",
|
|
"border": "#f3c1c1",
|
|
"eyebrow": "#bb2d2d",
|
|
"text": "#132033",
|
|
},
|
|
}.get(variant, {
|
|
"background": "#f8fafc",
|
|
"border": "#d9e2ef",
|
|
"eyebrow": "#6b778c",
|
|
"text": "#132033",
|
|
})
|
|
return (
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
f"style=\"border-collapse:separate; margin:0 0 18px; background:{styles['background']}; "
|
|
f"border:1px solid {styles['border']}; border-radius:18px;\">"
|
|
f"<tr><td style=\"padding:18px; font-family:Segoe UI, Arial, sans-serif; color:{styles['text']};\">"
|
|
f"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:{styles['eyebrow']}; margin-bottom:10px;\">"
|
|
f"{html.escape(title)}</div>"
|
|
f"<div style=\"font-size:14px; line-height:1.8; color:{styles['text']};\">{body_html}</div>"
|
|
"</td></tr></table>"
|
|
)
|
|
|
|
|
|
DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
|
|
"invited": {
|
|
"subject": "{{app_name}} invite for {{recipient_email}}",
|
|
"body_text": (
|
|
"You have been invited to {{app_name}}.\n\n"
|
|
"Invite code: {{invite_code}}\n"
|
|
"Signup link: {{invite_link}}\n"
|
|
"Invited by: {{inviter_username}}\n"
|
|
"Invite label: {{invite_label}}\n"
|
|
"Expires: {{invite_expires_at}}\n"
|
|
"Remaining uses: {{invite_remaining_uses}}\n\n"
|
|
"{{invite_description}}\n\n"
|
|
"{{message}}\n\n"
|
|
"How it works: {{how_it_works_url}}\n"
|
|
"Build: {{build_number}}\n"
|
|
),
|
|
"body_html": (
|
|
"<div style=\"margin:0 0 20px; color:#132033; font-size:15px; line-height:1.7;\">"
|
|
"A new invitation has been prepared for <strong>{{recipient_email}}</strong>. Use the details below to sign up."
|
|
"</div>"
|
|
+ _build_email_stat_grid(
|
|
[
|
|
_build_email_stat_card("Invite code", "{{invite_code}}"),
|
|
_build_email_stat_card("Invited by", "{{inviter_username}}"),
|
|
_build_email_stat_card("Invite label", "{{invite_label}}"),
|
|
_build_email_stat_card(
|
|
"Access window",
|
|
"{{invite_expires_at}}",
|
|
"Remaining uses: {{invite_remaining_uses}}",
|
|
),
|
|
]
|
|
)
|
|
+ _build_email_panel(
|
|
"Invitation details",
|
|
"<div style=\"white-space:pre-line;\">{{invite_description}}</div>",
|
|
variant="brand",
|
|
)
|
|
+ _build_email_panel(
|
|
"Message from admin",
|
|
"<div style=\"white-space:pre-line;\">{{message}}</div>",
|
|
variant="neutral",
|
|
)
|
|
+ _build_email_panel(
|
|
"What happens next",
|
|
_build_email_list(
|
|
[
|
|
"Open the invite link and complete the signup flow.",
|
|
"Sign in using the shared credentials for Magent and Seerr.",
|
|
"Use the How it works page if you want a quick overview first.",
|
|
],
|
|
ordered=True,
|
|
),
|
|
variant="neutral",
|
|
)
|
|
),
|
|
},
|
|
"welcome": {
|
|
"subject": "Welcome to {{app_name}}",
|
|
"body_text": (
|
|
"Welcome to {{app_name}}, {{username}}.\n\n"
|
|
"Your account is ready.\n"
|
|
"Open: {{app_url}}\n"
|
|
"How it works: {{how_it_works_url}}\n"
|
|
"Role: {{role}}\n\n"
|
|
"{{message}}\n"
|
|
),
|
|
"body_html": (
|
|
"<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
|
|
"Your account is live and ready to use. Everything below mirrors the current site behavior."
|
|
"</div>"
|
|
+ _build_email_stat_grid(
|
|
[
|
|
_build_email_stat_card("Username", "{{username}}"),
|
|
_build_email_stat_card("Role", "{{role}}"),
|
|
_build_email_stat_card("Magent", "{{app_url}}"),
|
|
_build_email_stat_card("Guides", "{{how_it_works_url}}"),
|
|
]
|
|
)
|
|
+ _build_email_panel(
|
|
"What to do next",
|
|
_build_email_list(
|
|
[
|
|
"Open Magent and sign in using your shared credentials.",
|
|
"Search all requests or review your own activity without refreshing the page.",
|
|
"Use the invite tools in your profile if your account allows it.",
|
|
],
|
|
ordered=True,
|
|
),
|
|
variant="success",
|
|
)
|
|
+ _build_email_panel(
|
|
"Additional notes",
|
|
"<div style=\"white-space:pre-line;\">{{message}}</div>",
|
|
variant="neutral",
|
|
)
|
|
),
|
|
},
|
|
"warning": {
|
|
"subject": "{{app_name}} account warning",
|
|
"body_text": (
|
|
"Hello {{username}},\n\n"
|
|
"This is a warning regarding your {{app_name}} account.\n\n"
|
|
"Reason: {{reason}}\n\n"
|
|
"{{message}}\n\n"
|
|
"If you need help, contact the admin.\n"
|
|
),
|
|
"body_html": (
|
|
"<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
|
|
"Please review this account notice carefully. This message was sent by an administrator."
|
|
"</div>"
|
|
+ _build_email_stat_grid(
|
|
[
|
|
_build_email_stat_card("Account", "{{username}}"),
|
|
_build_email_stat_card("Role", "{{role}}"),
|
|
_build_email_stat_card("Application", "{{app_name}}"),
|
|
_build_email_stat_card("Support", "{{how_it_works_url}}"),
|
|
]
|
|
)
|
|
+ _build_email_panel(
|
|
"Reason",
|
|
"<div style=\"font-size:18px; font-weight:800; line-height:1.6; white-space:pre-line;\">{{reason}}</div>",
|
|
variant="warning",
|
|
)
|
|
+ _build_email_panel(
|
|
"Administrator note",
|
|
"<div style=\"white-space:pre-line;\">{{message}}</div>",
|
|
variant="neutral",
|
|
)
|
|
+ _build_email_panel(
|
|
"What to do next",
|
|
_build_email_list(
|
|
[
|
|
"Review the note above and confirm you understand what needs to change.",
|
|
"If you need help, reply through your usual support path or contact an administrator.",
|
|
"Keep this email for reference until the matter is resolved.",
|
|
]
|
|
),
|
|
variant="neutral",
|
|
)
|
|
),
|
|
},
|
|
"banned": {
|
|
"subject": "{{app_name}} account status changed",
|
|
"body_text": (
|
|
"Hello {{username}},\n\n"
|
|
"Your {{app_name}} account has been banned or removed.\n\n"
|
|
"Reason: {{reason}}\n\n"
|
|
"{{message}}\n"
|
|
),
|
|
"body_html": (
|
|
"<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
|
|
"Your account access has changed. Review the details below."
|
|
"</div>"
|
|
+ _build_email_stat_grid(
|
|
[
|
|
_build_email_stat_card("Account", "{{username}}"),
|
|
_build_email_stat_card("Status", "Restricted"),
|
|
_build_email_stat_card("Application", "{{app_name}}"),
|
|
_build_email_stat_card("Guidance", "{{how_it_works_url}}"),
|
|
]
|
|
)
|
|
+ _build_email_panel(
|
|
"Reason",
|
|
"<div style=\"font-size:18px; font-weight:800; line-height:1.6; white-space:pre-line;\">{{reason}}</div>",
|
|
variant="danger",
|
|
)
|
|
+ _build_email_panel(
|
|
"Administrator note",
|
|
"<div style=\"white-space:pre-line;\">{{message}}</div>",
|
|
variant="neutral",
|
|
)
|
|
+ _build_email_panel(
|
|
"What this means",
|
|
_build_email_list(
|
|
[
|
|
"Your access has been removed or restricted across the linked services.",
|
|
"If you believe this is incorrect, contact the site administrator directly.",
|
|
"Do not rely on old links or cached sessions after this change.",
|
|
]
|
|
),
|
|
variant="neutral",
|
|
)
|
|
),
|
|
},
|
|
}
|
|
|
|
|
|
def _template_setting_key(template_key: str) -> str:
|
|
return f"{TEMPLATE_SETTING_PREFIX}{template_key}"
|
|
|
|
|
|
def _is_valid_email(value: object) -> bool:
|
|
if not isinstance(value, str):
|
|
return False
|
|
candidate = value.strip()
|
|
if not candidate:
|
|
return False
|
|
return bool(EMAIL_PATTERN.match(candidate))
|
|
|
|
|
|
def _normalize_email(value: object) -> Optional[str]:
|
|
if not _is_valid_email(value):
|
|
return None
|
|
return str(value).strip()
|
|
|
|
|
|
def normalize_delivery_email(value: object) -> Optional[str]:
|
|
return _normalize_email(value)
|
|
|
|
|
|
def _normalize_display_text(value: object, fallback: str = "") -> str:
|
|
if value is None:
|
|
return fallback
|
|
if isinstance(value, str):
|
|
trimmed = value.strip()
|
|
return trimmed if trimmed else fallback
|
|
return str(value)
|
|
|
|
|
|
def _template_context_value(value: object, fallback: str = "") -> str:
|
|
if value is None:
|
|
return fallback
|
|
if isinstance(value, str):
|
|
return value.strip()
|
|
return str(value)
|
|
|
|
|
|
def _safe_template_context(context: Dict[str, object]) -> Dict[str, str]:
|
|
safe: Dict[str, str] = {}
|
|
for key in TEMPLATE_PLACEHOLDERS:
|
|
safe[key] = _template_context_value(context.get(key), "")
|
|
return safe
|
|
|
|
|
|
def _render_template_string(template: str, context: Dict[str, str], *, escape_html: bool = False) -> str:
|
|
if not isinstance(template, str):
|
|
return ""
|
|
|
|
def _replace(match: re.Match[str]) -> str:
|
|
key = match.group(1)
|
|
value = context.get(key, "")
|
|
return html.escape(value) if escape_html else value
|
|
|
|
return PLACEHOLDER_PATTERN.sub(_replace, template)
|
|
|
|
|
|
def _strip_html_for_text(value: str) -> str:
|
|
text = re.sub(r"<br\s*/?>", "\n", value, flags=re.IGNORECASE)
|
|
text = re.sub(r"</p>", "\n\n", text, flags=re.IGNORECASE)
|
|
text = re.sub(r"<[^>]+>", "", text)
|
|
return html.unescape(text).strip()
|
|
|
|
|
|
def _build_default_base_url() -> str:
|
|
runtime = get_runtime_settings()
|
|
for candidate in (
|
|
runtime.magent_application_url,
|
|
runtime.magent_proxy_base_url,
|
|
env_settings.cors_allow_origin,
|
|
):
|
|
normalized = _normalize_display_text(candidate)
|
|
if normalized:
|
|
return normalized.rstrip("/")
|
|
port = int(getattr(runtime, "magent_application_port", 3000) or 3000)
|
|
return f"http://localhost:{port}"
|
|
|
|
|
|
def _derive_mail_hostname(*, from_address: str) -> str:
|
|
runtime = get_runtime_settings()
|
|
candidates = (
|
|
runtime.magent_application_url,
|
|
runtime.magent_proxy_base_url,
|
|
env_settings.cors_allow_origin,
|
|
)
|
|
for candidate in candidates:
|
|
normalized = _normalize_display_text(candidate)
|
|
if not normalized:
|
|
continue
|
|
parsed = urlparse(normalized if "://" in normalized else f"https://{normalized}")
|
|
hostname = _normalize_display_text(parsed.hostname)
|
|
if hostname and "." in hostname:
|
|
return hostname
|
|
domain = _normalize_display_text(from_address.split("@", 1)[1] if "@" in from_address else None)
|
|
if domain and "." in domain:
|
|
return domain
|
|
return "localhost"
|
|
|
|
|
|
def _add_transactional_headers(
|
|
message: EmailMessage,
|
|
*,
|
|
from_name: str,
|
|
from_address: str,
|
|
) -> None:
|
|
message["Reply-To"] = formataddr((from_name, from_address))
|
|
message["Organization"] = env_settings.app_name
|
|
message["X-Mailer"] = f"{env_settings.app_name}/{BUILD_NUMBER}"
|
|
message["Auto-Submitted"] = "auto-generated"
|
|
message["X-Auto-Response-Suppress"] = "All"
|
|
|
|
|
|
def _looks_like_full_html_document(value: str) -> bool:
|
|
probe = value.lstrip().lower()
|
|
return probe.startswith("<!doctype") or probe.startswith("<html") or "<body" in probe[:300]
|
|
|
|
|
|
def _build_email_action_button(label: str, url: str, *, primary: bool) -> str:
|
|
background = "linear-gradient(135deg, #ff6b2b 0%, #1c6bff 100%)" if primary else "#ffffff"
|
|
fallback = "#1c6bff" if primary else "#ffffff"
|
|
border = "1px solid rgba(28, 107, 255, 0.28)" if primary else "1px solid #d5deed"
|
|
color = "#ffffff" if primary else "#132033"
|
|
return (
|
|
f"<a href=\"{html.escape(url)}\" "
|
|
f"style=\"display:inline-block; padding:12px 20px; margin:0 12px 12px 0; border-radius:999px; "
|
|
f"background-color:{fallback}; background:{background}; border:{border}; color:{color}; text-decoration:none; font-size:14px; "
|
|
f"font-weight:800; letter-spacing:0.01em;\">{html.escape(label)}</a>"
|
|
)
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _get_email_logo_bytes() -> bytes:
|
|
logo_path = Path(__file__).resolve().parents[1] / "assets" / "branding" / "logo.png"
|
|
try:
|
|
return logo_path.read_bytes()
|
|
except OSError:
|
|
return b""
|
|
|
|
|
|
def _build_email_logo_block(app_name: str) -> str:
|
|
if _get_email_logo_bytes():
|
|
return (
|
|
f"<img src=\"cid:{EMAIL_LOGO_CID}\" alt=\"{html.escape(app_name)}\" width=\"52\" height=\"52\" "
|
|
"style=\"display:block; width:52px; height:52px; border-radius:14px; border:1px solid rgba(255,255,255,0.08); "
|
|
"background:#0f1522; padding:6px;\" />"
|
|
)
|
|
return (
|
|
"<div style=\"width:52px; height:52px; border-radius:14px; border:1px solid rgba(255,255,255,0.08); "
|
|
"background:linear-gradient(135deg, #ff6b2b 0%, #1c6bff 100%); color:#ffffff; font-size:24px; "
|
|
"font-weight:900; text-align:center; line-height:52px;\">M</div>"
|
|
)
|
|
|
|
|
|
def _build_outlook_safe_test_email_html(
|
|
*,
|
|
app_name: str,
|
|
application_url: str,
|
|
build_number: str,
|
|
smtp_target: str,
|
|
security_mode: str,
|
|
auth_mode: str,
|
|
warning: str,
|
|
primary_url: str = "",
|
|
) -> str:
|
|
action_html = (
|
|
_build_email_action_button("Open Magent", primary_url, primary=True) if primary_url else ""
|
|
)
|
|
logo_block = _build_email_logo_block(app_name)
|
|
warning_block = (
|
|
"<tr>"
|
|
"<td style=\"padding:0 32px 24px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"border-collapse:separate; background:#fff5ea; border:1px solid #ffd5a8; border-radius:14px;\">"
|
|
"<tr><td style=\"padding:16px; color:#132033; font-size:14px; line-height:1.7;\">"
|
|
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#c46a10; margin-bottom:8px;\">"
|
|
"Delivery notes</div>"
|
|
f"{html.escape(warning)}"
|
|
"</td></tr></table>"
|
|
"</td>"
|
|
"</tr>"
|
|
) if warning else ""
|
|
return (
|
|
"<!doctype html>"
|
|
"<html>"
|
|
"<body style=\"margin:0; padding:0; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"border-collapse:collapse; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
|
|
"<tr><td align=\"center\" style=\"padding:32px 16px;\">"
|
|
"<table role=\"presentation\" width=\"680\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"width:680px; max-width:680px; border-collapse:collapse; background:#ffffff; border:1px solid #d5deed;\">"
|
|
"<tr><td style=\"padding:24px 32px; background:#0f172a;\" bgcolor=\"#0f172a\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
|
|
"<tr>"
|
|
f"<td width=\"56\" valign=\"middle\">{logo_block}</td>"
|
|
"<td valign=\"middle\" style=\"padding-left:16px; font-family:Segoe UI, Arial, sans-serif; color:#ffffff;\">"
|
|
f"<div style=\"font-size:28px; line-height:1.1; font-weight:800; color:#ffffff;\">{html.escape(app_name)} email test</div>"
|
|
"<div style=\"margin-top:6px; font-size:15px; line-height:1.5; color:#d5deed;\">This confirms Magent can generate and hand off branded mail.</div>"
|
|
"</td>"
|
|
"</tr>"
|
|
"</table>"
|
|
"</td></tr>"
|
|
"<tr><td height=\"6\" style=\"background:#ff6b2b; font-size:0; line-height:0;\"> </td></tr>"
|
|
"<tr><td style=\"padding:28px 32px 18px 32px; font-family:Segoe UI, Arial, sans-serif; color:#132033;\">"
|
|
"<div style=\"font-size:18px; line-height:1.6; color:#132033;\">This is a test email from <strong>Magent</strong>.</div>"
|
|
"</td></tr>"
|
|
"<tr>"
|
|
"<td style=\"padding:0 32px 18px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
|
|
"<tr>"
|
|
"<td width=\"50%\" valign=\"top\" style=\"padding:0 8px 12px 0;\">"
|
|
f"{_build_email_stat_card('Build', build_number)}"
|
|
"</td>"
|
|
"<td width=\"50%\" valign=\"top\" style=\"padding:0 0 12px 8px;\">"
|
|
f"{_build_email_stat_card('Application URL', application_url)}"
|
|
"</td>"
|
|
"</tr>"
|
|
"<tr>"
|
|
"<td width=\"50%\" valign=\"top\" style=\"padding:0 8px 12px 0;\">"
|
|
f"{_build_email_stat_card('SMTP target', smtp_target)}"
|
|
"</td>"
|
|
"<td width=\"50%\" valign=\"top\" style=\"padding:0 0 12px 8px;\">"
|
|
f"{_build_email_stat_card('Security', security_mode, auth_mode)}"
|
|
"</td>"
|
|
"</tr>"
|
|
"</table>"
|
|
"</td>"
|
|
"</tr>"
|
|
"<tr>"
|
|
"<td style=\"padding:0 32px 24px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"border-collapse:separate; background:#eef4ff; border:1px solid #bfd2ff; border-radius:14px;\">"
|
|
"<tr><td style=\"padding:16px; color:#132033; font-size:14px; line-height:1.8;\">"
|
|
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#2754b6; margin-bottom:8px;\">"
|
|
"What this verifies</div>"
|
|
"<div>Magent can build the HTML template shell correctly.</div>"
|
|
"<div>The configured SMTP route accepts and relays the message.</div>"
|
|
"<div>Branding, links, and build metadata are rendering consistently.</div>"
|
|
"</td></tr></table>"
|
|
"</td>"
|
|
"</tr>"
|
|
f"{warning_block}"
|
|
"<tr>"
|
|
"<td style=\"padding:0 32px 32px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
|
|
f"{action_html}"
|
|
"</td>"
|
|
"</tr>"
|
|
"</table>"
|
|
"</td></tr></table>"
|
|
"</body>"
|
|
"</html>"
|
|
)
|
|
|
|
|
|
def _wrap_email_html(
|
|
*,
|
|
app_name: str,
|
|
app_url: str,
|
|
build_number: str,
|
|
title: str,
|
|
subtitle: str,
|
|
tone: str,
|
|
body_html: str,
|
|
primary_label: str = "",
|
|
primary_url: str = "",
|
|
secondary_label: str = "",
|
|
secondary_url: str = "",
|
|
footer_note: str = "",
|
|
) -> str:
|
|
styles = EMAIL_TONE_STYLES.get(tone, EMAIL_TONE_STYLES["brand"])
|
|
actions = []
|
|
if primary_label and primary_url:
|
|
actions.append(_build_email_action_button(primary_label, primary_url, primary=True))
|
|
if secondary_label and secondary_url:
|
|
actions.append(_build_email_action_button(secondary_label, secondary_url, primary=False))
|
|
actions_html = "".join(actions)
|
|
|
|
footer = footer_note or "This email was generated automatically by Magent."
|
|
logo_block = _build_email_logo_block(app_name)
|
|
|
|
return (
|
|
"<!doctype html>"
|
|
"<html><body style=\"margin:0; padding:0; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
|
|
"<div style=\"display:none; max-height:0; overflow:hidden; opacity:0;\">"
|
|
f"{html.escape(title)} - {html.escape(subtitle)}"
|
|
"</div>"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"width:100%; border-collapse:collapse; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
|
|
"<tr><td style=\"padding:32px 18px;\" bgcolor=\"#eef2f7\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
|
|
"style=\"max-width:680px; margin:0 auto; border-collapse:collapse;\">"
|
|
"<tr><td style=\"padding:0 0 18px;\">"
|
|
f"<div style=\"padding:24px 28px; background:#ffffff; border:1px solid #d5deed; border-radius:28px; box-shadow:0 18px 48px rgba(15,23,42,0.08);\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
|
|
"<tr>"
|
|
f"<td style=\"vertical-align:middle; width:64px; padding:0 18px 0 0;\">{logo_block}</td>"
|
|
"<td style=\"vertical-align:middle;\">"
|
|
f"<div style=\"font-size:11px; letter-spacing:0.18em; text-transform:uppercase; color:#6b778c; margin-bottom:6px;\">{html.escape(app_name)}</div>"
|
|
f"<div style=\"font-size:30px; line-height:1.1; font-weight:900; color:#132033; margin:0 0 6px;\">{html.escape(title)}</div>"
|
|
f"<div style=\"font-size:15px; line-height:1.6; color:#5c687d;\">{html.escape(subtitle or EMAIL_TAGLINE)}</div>"
|
|
"</td>"
|
|
"</tr>"
|
|
"</table>"
|
|
f"<div style=\"height:6px; margin:22px 0 22px; border-radius:999px; background-color:{styles['accent_b']}; background:linear-gradient(90deg, {styles['accent_a']} 0%, {styles['accent_b']} 100%);\"></div>"
|
|
f"<div style=\"display:inline-block; padding:7px 12px; margin:0 0 16px; background:{styles['chip_bg']}; "
|
|
f"border:1px solid {styles['chip_border']}; border-radius:999px; color:{styles['chip_text']}; "
|
|
"font-size:11px; font-weight:800; letter-spacing:0.14em; text-transform:uppercase;\">"
|
|
f"{html.escape(EMAIL_TAGLINE)}</div>"
|
|
f"<div style=\"color:#132033;\">{body_html}</div>"
|
|
f"<div style=\"margin:24px 0 0;\">{actions_html}</div>"
|
|
"<div style=\"margin:28px 0 0; padding:18px 0 0; border-top:1px solid #e2e8f0;\">"
|
|
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
|
|
"<tr>"
|
|
f"<td style=\"font-size:12px; line-height:1.7; color:#6b778c;\">{html.escape(footer)}</td>"
|
|
f"<td style=\"font-size:12px; line-height:1.7; color:#6b778c; text-align:right;\">Build {html.escape(build_number)}</td>"
|
|
"</tr>"
|
|
"</table>"
|
|
"</div>"
|
|
"</div>"
|
|
"</td></tr></table>"
|
|
"</td></tr></table>"
|
|
"</body></html>"
|
|
)
|
|
|
|
|
|
def build_invite_email_context(
|
|
*,
|
|
invite: Optional[Dict[str, Any]] = None,
|
|
user: Optional[Dict[str, Any]] = None,
|
|
recipient_email: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
reason: Optional[str] = None,
|
|
overrides: Optional[Dict[str, object]] = None,
|
|
) -> Dict[str, str]:
|
|
app_url = _build_default_base_url()
|
|
invite_code = _normalize_display_text(invite.get("code") if invite else None, "Not set")
|
|
invite_link = f"{app_url}/signup?code={invite_code}" if invite_code != "Not set" else f"{app_url}/signup"
|
|
remaining_uses = invite.get("remaining_uses") if invite else None
|
|
resolved_recipient = _normalize_email(recipient_email)
|
|
if not resolved_recipient and invite:
|
|
resolved_recipient = _normalize_email(invite.get("recipient_email"))
|
|
if not resolved_recipient and user:
|
|
resolved_recipient = resolve_user_delivery_email(user)
|
|
|
|
context: Dict[str, object] = {
|
|
"app_name": env_settings.app_name,
|
|
"app_url": app_url,
|
|
"build_number": BUILD_NUMBER,
|
|
"how_it_works_url": f"{app_url}/how-it-works",
|
|
"invite_code": invite_code,
|
|
"invite_description": _normalize_display_text(invite.get("description") if invite else None, "No extra details."),
|
|
"invite_expires_at": _normalize_display_text(invite.get("expires_at") if invite else None, "Never"),
|
|
"invite_label": _normalize_display_text(invite.get("label") if invite else None, "No label"),
|
|
"invite_link": invite_link,
|
|
"invite_remaining_uses": (
|
|
"Unlimited" if remaining_uses in (None, "") else _normalize_display_text(remaining_uses)
|
|
),
|
|
"inviter_username": _normalize_display_text(
|
|
invite.get("created_by") if invite else (user.get("username") if user else None),
|
|
"Admin",
|
|
),
|
|
"message": _normalize_display_text(message, "No additional note."),
|
|
"reason": _normalize_display_text(reason, "Not specified"),
|
|
"recipient_email": _normalize_display_text(resolved_recipient, "No email supplied"),
|
|
"role": _normalize_display_text(user.get("role") if user else None, "user"),
|
|
"username": _normalize_display_text(user.get("username") if user else None, "there"),
|
|
}
|
|
if isinstance(overrides, dict):
|
|
context.update(overrides)
|
|
return _safe_template_context(context)
|
|
|
|
|
|
def get_invite_email_templates() -> Dict[str, Dict[str, Any]]:
|
|
templates: Dict[str, Dict[str, Any]] = {}
|
|
for template_key in TEMPLATE_KEYS:
|
|
template = dict(DEFAULT_TEMPLATES[template_key])
|
|
raw_value = get_setting(_template_setting_key(template_key))
|
|
if raw_value:
|
|
try:
|
|
stored = json.loads(raw_value)
|
|
except (TypeError, json.JSONDecodeError):
|
|
stored = {}
|
|
if isinstance(stored, dict):
|
|
for field in ("subject", "body_text", "body_html"):
|
|
if isinstance(stored.get(field), str):
|
|
template[field] = stored[field]
|
|
templates[template_key] = {
|
|
"key": template_key,
|
|
"label": TEMPLATE_METADATA[template_key]["label"],
|
|
"description": TEMPLATE_METADATA[template_key]["description"],
|
|
"placeholders": TEMPLATE_PLACEHOLDERS,
|
|
**template,
|
|
}
|
|
return templates
|
|
|
|
|
|
def get_invite_email_template(template_key: str) -> Dict[str, Any]:
|
|
if template_key not in TEMPLATE_KEYS:
|
|
raise ValueError(f"Unknown email template: {template_key}")
|
|
return get_invite_email_templates()[template_key]
|
|
|
|
|
|
def save_invite_email_template(
|
|
template_key: str,
|
|
*,
|
|
subject: str,
|
|
body_text: str,
|
|
body_html: str,
|
|
) -> Dict[str, Any]:
|
|
if template_key not in TEMPLATE_KEYS:
|
|
raise ValueError(f"Unknown email template: {template_key}")
|
|
payload = {
|
|
"subject": subject,
|
|
"body_text": body_text,
|
|
"body_html": body_html,
|
|
}
|
|
set_setting(_template_setting_key(template_key), json.dumps(payload))
|
|
return get_invite_email_template(template_key)
|
|
|
|
|
|
def reset_invite_email_template(template_key: str) -> Dict[str, Any]:
|
|
if template_key not in TEMPLATE_KEYS:
|
|
raise ValueError(f"Unknown email template: {template_key}")
|
|
delete_setting(_template_setting_key(template_key))
|
|
return get_invite_email_template(template_key)
|
|
|
|
|
|
def render_invite_email_template(
|
|
template_key: str,
|
|
*,
|
|
invite: Optional[Dict[str, Any]] = None,
|
|
user: Optional[Dict[str, Any]] = None,
|
|
recipient_email: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
reason: Optional[str] = None,
|
|
overrides: Optional[Dict[str, object]] = None,
|
|
) -> Dict[str, str]:
|
|
template = get_invite_email_template(template_key)
|
|
context = build_invite_email_context(
|
|
invite=invite,
|
|
user=user,
|
|
recipient_email=recipient_email,
|
|
message=message,
|
|
reason=reason,
|
|
overrides=overrides,
|
|
)
|
|
raw_body_html = _render_template_string(template["body_html"], context, escape_html=True)
|
|
body_text = _render_template_string(template["body_text"], context, escape_html=False)
|
|
if not body_text.strip() and raw_body_html.strip():
|
|
body_text = _strip_html_for_text(raw_body_html)
|
|
subject = _render_template_string(template["subject"], context, escape_html=False)
|
|
presentation = TEMPLATE_PRESENTATION.get(template_key, TEMPLATE_PRESENTATION["invited"])
|
|
primary_url = _normalize_display_text(context.get(presentation["primary_url_key"], ""))
|
|
secondary_url = _normalize_display_text(context.get(presentation["secondary_url_key"], ""))
|
|
if _looks_like_full_html_document(raw_body_html):
|
|
body_html = raw_body_html.strip()
|
|
else:
|
|
body_html = _wrap_email_html(
|
|
app_name=_normalize_display_text(context.get("app_name"), env_settings.app_name),
|
|
app_url=_normalize_display_text(context.get("app_url"), _build_default_base_url()),
|
|
build_number=_normalize_display_text(context.get("build_number"), BUILD_NUMBER),
|
|
title=_normalize_display_text(context.get("title"), presentation["title"]),
|
|
subtitle=_normalize_display_text(context.get("subtitle"), presentation["subtitle"]),
|
|
tone=_normalize_display_text(context.get("tone"), presentation["tone"]),
|
|
body_html=raw_body_html.strip(),
|
|
primary_label=_normalize_display_text(
|
|
context.get("primary_label"), presentation["primary_label"]
|
|
),
|
|
primary_url=primary_url,
|
|
secondary_label=_normalize_display_text(
|
|
context.get("secondary_label"), presentation["secondary_label"]
|
|
),
|
|
secondary_url=secondary_url,
|
|
footer_note=_normalize_display_text(context.get("footer_note"), ""),
|
|
).strip()
|
|
return {
|
|
"subject": subject.strip(),
|
|
"body_text": body_text.strip(),
|
|
"body_html": body_html.strip(),
|
|
}
|
|
|
|
|
|
def resolve_user_delivery_email(user: Optional[Dict[str, Any]], invite: Optional[Dict[str, Any]] = None) -> Optional[str]:
|
|
if not isinstance(user, dict):
|
|
return _normalize_email(invite.get("recipient_email") if isinstance(invite, dict) else None)
|
|
stored_email = _normalize_email(user.get("email"))
|
|
if stored_email:
|
|
return stored_email
|
|
username_email = _normalize_email(user.get("username"))
|
|
if username_email:
|
|
return username_email
|
|
if isinstance(invite, dict):
|
|
invite_email = _normalize_email(invite.get("recipient_email"))
|
|
if invite_email:
|
|
return invite_email
|
|
return None
|
|
|
|
|
|
def smtp_email_config_ready() -> tuple[bool, str]:
|
|
runtime = get_runtime_settings()
|
|
if not runtime.magent_notify_enabled:
|
|
return False, "Notifications are disabled."
|
|
if not runtime.magent_notify_email_enabled:
|
|
return False, "Email notifications are disabled."
|
|
if not _normalize_display_text(runtime.magent_notify_email_smtp_host):
|
|
return False, "SMTP host is not configured."
|
|
if not _normalize_email(runtime.magent_notify_email_from_address):
|
|
return False, "From email address is not configured."
|
|
return True, "ok"
|
|
|
|
|
|
def smtp_email_delivery_warning() -> Optional[str]:
|
|
runtime = get_runtime_settings()
|
|
host = _normalize_display_text(runtime.magent_notify_email_smtp_host).lower()
|
|
username = _normalize_display_text(runtime.magent_notify_email_smtp_username)
|
|
password = _normalize_display_text(runtime.magent_notify_email_smtp_password)
|
|
if host.endswith(".mail.protection.outlook.com") and not (username and password):
|
|
return (
|
|
"Unauthenticated Microsoft 365 relay mode is configured. SMTP acceptance does not "
|
|
"confirm mailbox delivery, and suspicious messages can still be filtered. For reliable "
|
|
"delivery, use smtp.office365.com:587 with SMTP credentials or configure a verified "
|
|
"Exchange relay connector and make sure SPF, DKIM, and DMARC are healthy for the "
|
|
"sender domain."
|
|
)
|
|
return None
|
|
|
|
|
|
def _flatten_message(message: EmailMessage) -> bytes:
|
|
buffer = BytesIO()
|
|
BytesGenerator(buffer, policy=SMTP_POLICY).flatten(message)
|
|
return buffer.getvalue()
|
|
|
|
|
|
def _decode_smtp_message(value: bytes | str | None) -> str:
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, bytes):
|
|
return value.decode("utf-8", errors="replace")
|
|
return str(value)
|
|
|
|
|
|
def _parse_exchange_receipt(value: bytes | str | None) -> Dict[str, str]:
|
|
message = _decode_smtp_message(value)
|
|
receipt: Dict[str, str] = {"raw": message}
|
|
message_id_match = EXCHANGE_MESSAGE_ID_PATTERN.search(message)
|
|
internal_id_match = EXCHANGE_INTERNAL_ID_PATTERN.search(message)
|
|
if message_id_match:
|
|
receipt["provider_message_id"] = message_id_match.group(1)
|
|
if internal_id_match:
|
|
receipt["provider_internal_id"] = internal_id_match.group(1)
|
|
return receipt
|
|
|
|
|
|
def _send_via_smtp_session(
|
|
smtp: smtplib.SMTP,
|
|
*,
|
|
from_address: str,
|
|
recipient_email: str,
|
|
message: EmailMessage,
|
|
) -> Dict[str, str]:
|
|
mail_code, mail_message = smtp.mail(from_address)
|
|
if mail_code >= 400:
|
|
raise smtplib.SMTPResponseException(mail_code, mail_message)
|
|
rcpt_code, rcpt_message = smtp.rcpt(recipient_email)
|
|
if rcpt_code >= 400:
|
|
raise smtplib.SMTPRecipientsRefused({recipient_email: (rcpt_code, rcpt_message)})
|
|
data_code, data_message = smtp.data(_flatten_message(message))
|
|
if data_code >= 400:
|
|
raise smtplib.SMTPDataError(data_code, data_message)
|
|
receipt = _parse_exchange_receipt(data_message)
|
|
receipt["mail_response"] = _decode_smtp_message(mail_message)
|
|
receipt["rcpt_response"] = _decode_smtp_message(rcpt_message)
|
|
receipt["data_response"] = _decode_smtp_message(data_message)
|
|
return receipt
|
|
|
|
|
|
def _send_email_sync(*, recipient_email: str, subject: str, body_text: str, body_html: str) -> Dict[str, str]:
|
|
runtime = get_runtime_settings()
|
|
host = _normalize_display_text(runtime.magent_notify_email_smtp_host)
|
|
port = int(runtime.magent_notify_email_smtp_port or 587)
|
|
username = _normalize_display_text(runtime.magent_notify_email_smtp_username)
|
|
password = _normalize_display_text(runtime.magent_notify_email_smtp_password)
|
|
from_address = _normalize_email(runtime.magent_notify_email_from_address)
|
|
from_name = _normalize_display_text(runtime.magent_notify_email_from_name, env_settings.app_name)
|
|
use_tls = bool(runtime.magent_notify_email_use_tls)
|
|
use_ssl = bool(runtime.magent_notify_email_use_ssl)
|
|
delivery_warning = smtp_email_delivery_warning()
|
|
if not host or not from_address:
|
|
raise RuntimeError("SMTP email settings are incomplete.")
|
|
local_hostname = _derive_mail_hostname(from_address=from_address)
|
|
logger.info(
|
|
"smtp send started recipient=%s from=%s host=%s port=%s tls=%s ssl=%s auth=%s subject=%s ehlo=%s",
|
|
recipient_email,
|
|
from_address,
|
|
host,
|
|
port,
|
|
use_tls,
|
|
use_ssl,
|
|
bool(username and password),
|
|
subject,
|
|
local_hostname,
|
|
)
|
|
if delivery_warning:
|
|
logger.warning("smtp delivery warning host=%s detail=%s", host, delivery_warning)
|
|
|
|
message = EmailMessage()
|
|
message["Subject"] = subject
|
|
message["From"] = formataddr((from_name, from_address))
|
|
message["To"] = recipient_email
|
|
message["Date"] = formatdate(localtime=True)
|
|
if "@" in from_address:
|
|
message["Message-ID"] = make_msgid(domain=from_address.split("@", 1)[1])
|
|
else:
|
|
message["Message-ID"] = make_msgid()
|
|
_add_transactional_headers(
|
|
message,
|
|
from_name=from_name,
|
|
from_address=from_address,
|
|
)
|
|
message.set_content(body_text or _strip_html_for_text(body_html))
|
|
if body_html.strip():
|
|
message.add_alternative(body_html, subtype="html")
|
|
if f"cid:{EMAIL_LOGO_CID}" in body_html:
|
|
logo_bytes = _get_email_logo_bytes()
|
|
if logo_bytes:
|
|
html_part = message.get_body(preferencelist=("html",))
|
|
if html_part is not None:
|
|
html_part.add_related(
|
|
logo_bytes,
|
|
maintype="image",
|
|
subtype="png",
|
|
cid=f"<{EMAIL_LOGO_CID}>",
|
|
filename="logo.png",
|
|
disposition="inline",
|
|
)
|
|
|
|
if use_ssl:
|
|
with smtplib.SMTP_SSL(host, port, timeout=20, local_hostname=local_hostname) as smtp:
|
|
logger.debug("smtp ssl connection opened host=%s port=%s", host, port)
|
|
if username and password:
|
|
smtp.login(username, password)
|
|
logger.debug("smtp login succeeded host=%s username=%s", host, username)
|
|
receipt = _send_via_smtp_session(
|
|
smtp,
|
|
from_address=from_address,
|
|
recipient_email=recipient_email,
|
|
message=message,
|
|
)
|
|
logger.info(
|
|
"smtp send accepted recipient=%s host=%s mode=ssl provider_message_id=%s provider_internal_id=%s",
|
|
recipient_email,
|
|
host,
|
|
receipt.get("provider_message_id"),
|
|
receipt.get("provider_internal_id"),
|
|
)
|
|
return receipt
|
|
|
|
with smtplib.SMTP(host, port, timeout=20, local_hostname=local_hostname) as smtp:
|
|
logger.debug("smtp connection opened host=%s port=%s", host, port)
|
|
smtp.ehlo()
|
|
if use_tls:
|
|
smtp.starttls()
|
|
smtp.ehlo()
|
|
logger.debug("smtp starttls negotiated host=%s port=%s", host, port)
|
|
if username and password:
|
|
smtp.login(username, password)
|
|
logger.debug("smtp login succeeded host=%s username=%s", host, username)
|
|
receipt = _send_via_smtp_session(
|
|
smtp,
|
|
from_address=from_address,
|
|
recipient_email=recipient_email,
|
|
message=message,
|
|
)
|
|
logger.info(
|
|
"smtp send accepted recipient=%s host=%s mode=plain provider_message_id=%s provider_internal_id=%s",
|
|
recipient_email,
|
|
host,
|
|
receipt.get("provider_message_id"),
|
|
receipt.get("provider_internal_id"),
|
|
)
|
|
return receipt
|
|
|
|
|
|
async def send_templated_email(
|
|
template_key: str,
|
|
*,
|
|
invite: Optional[Dict[str, Any]] = None,
|
|
user: Optional[Dict[str, Any]] = None,
|
|
recipient_email: Optional[str] = None,
|
|
message: Optional[str] = None,
|
|
reason: Optional[str] = None,
|
|
overrides: Optional[Dict[str, object]] = None,
|
|
) -> Dict[str, str]:
|
|
ready, detail = smtp_email_config_ready()
|
|
if not ready:
|
|
raise RuntimeError(detail)
|
|
|
|
resolved_email = _normalize_email(recipient_email)
|
|
if not resolved_email:
|
|
resolved_email = resolve_user_delivery_email(user, invite)
|
|
if not resolved_email:
|
|
raise RuntimeError("No valid recipient email is available for this action.")
|
|
|
|
rendered = render_invite_email_template(
|
|
template_key,
|
|
invite=invite,
|
|
user=user,
|
|
recipient_email=resolved_email,
|
|
message=message,
|
|
reason=reason,
|
|
overrides=overrides,
|
|
)
|
|
receipt = await asyncio.to_thread(
|
|
_send_email_sync,
|
|
recipient_email=resolved_email,
|
|
subject=rendered["subject"],
|
|
body_text=rendered["body_text"],
|
|
body_html=rendered["body_html"],
|
|
)
|
|
logger.info("Email template sent: template=%s recipient=%s", template_key, resolved_email)
|
|
return {
|
|
"recipient_email": resolved_email,
|
|
"subject": rendered["subject"],
|
|
**{
|
|
key: value
|
|
for key, value in receipt.items()
|
|
if key in {"provider_message_id", "provider_internal_id", "data_response"}
|
|
},
|
|
}
|
|
|
|
|
|
async def send_test_email(recipient_email: Optional[str] = None) -> Dict[str, str]:
|
|
ready, detail = smtp_email_config_ready()
|
|
if not ready:
|
|
raise RuntimeError(detail)
|
|
|
|
runtime = get_runtime_settings()
|
|
resolved_email = _normalize_email(recipient_email) or _normalize_email(
|
|
runtime.magent_notify_email_from_address
|
|
)
|
|
if not resolved_email:
|
|
raise RuntimeError("No valid recipient email is configured for the test message.")
|
|
|
|
application_url = _normalize_display_text(runtime.magent_application_url, "Not configured")
|
|
primary_url = application_url if application_url.lower().startswith(("http://", "https://")) else ""
|
|
smtp_target = f"{_normalize_display_text(runtime.magent_notify_email_smtp_host, 'Not configured')}:{int(runtime.magent_notify_email_smtp_port or 587)}"
|
|
security_mode = "SSL" if runtime.magent_notify_email_use_ssl else ("STARTTLS" if runtime.magent_notify_email_use_tls else "Plain SMTP")
|
|
auth_mode = "Authenticated" if (
|
|
_normalize_display_text(runtime.magent_notify_email_smtp_username)
|
|
and _normalize_display_text(runtime.magent_notify_email_smtp_password)
|
|
) else "No SMTP auth"
|
|
delivery_warning = smtp_email_delivery_warning()
|
|
subject = f"{env_settings.app_name} email test"
|
|
body_text = (
|
|
f"This is a test email from {env_settings.app_name}.\n\n"
|
|
f"Build: {BUILD_NUMBER}\n"
|
|
f"Application URL: {application_url}\n"
|
|
f"SMTP target: {smtp_target}\n"
|
|
f"Security: {security_mode} ({auth_mode})\n\n"
|
|
"What this verifies:\n"
|
|
"- Magent can build the HTML template shell correctly.\n"
|
|
"- The configured SMTP route accepts and relays the message.\n"
|
|
"- Branding, links, and build metadata are rendering consistently.\n"
|
|
)
|
|
body_html = _wrap_email_html(
|
|
app_name=env_settings.app_name,
|
|
app_url=_build_default_base_url(),
|
|
build_number=BUILD_NUMBER,
|
|
title="Email delivery test",
|
|
subtitle="This confirms Magent can generate and hand off branded mail.",
|
|
tone="brand",
|
|
body_html=(
|
|
"<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
|
|
"This is a live test email from Magent. If this renders correctly, the HTML template shell and SMTP handoff are both working."
|
|
"</div>"
|
|
+ _build_email_stat_grid(
|
|
[
|
|
_build_email_stat_card("Recipient", resolved_email),
|
|
_build_email_stat_card("Build", BUILD_NUMBER),
|
|
_build_email_stat_card("SMTP target", smtp_target),
|
|
_build_email_stat_card("Security", security_mode, auth_mode),
|
|
_build_email_stat_card("Application URL", application_url),
|
|
_build_email_stat_card("Template shell", "Branded HTML", "Logo, gradient, action buttons"),
|
|
]
|
|
)
|
|
+ _build_email_panel(
|
|
"What this verifies",
|
|
_build_email_list(
|
|
[
|
|
"Magent can build the HTML template shell correctly.",
|
|
"The configured SMTP route accepts and relays the message.",
|
|
"Branding, links, and build metadata are rendering consistently.",
|
|
]
|
|
),
|
|
variant="brand",
|
|
)
|
|
+ _build_email_panel(
|
|
"Delivery notes",
|
|
(
|
|
f"<div style=\"white-space:pre-line;\">{html.escape(delivery_warning)}</div>"
|
|
if delivery_warning
|
|
else "Use this test when changing SMTP settings, relay targets, or branding."
|
|
),
|
|
variant="warning" if delivery_warning else "neutral",
|
|
)
|
|
),
|
|
primary_label="Open Magent" if primary_url else "",
|
|
primary_url=primary_url,
|
|
footer_note="SMTP test email generated by Magent.",
|
|
)
|
|
|
|
receipt = await asyncio.to_thread(
|
|
_send_email_sync,
|
|
recipient_email=resolved_email,
|
|
subject=subject,
|
|
body_text=body_text,
|
|
body_html=body_html,
|
|
)
|
|
logger.info("SMTP test email sent: recipient=%s", resolved_email)
|
|
result = {"recipient_email": resolved_email, "subject": subject}
|
|
result.update(
|
|
{
|
|
key: value
|
|
for key, value in receipt.items()
|
|
if key in {"provider_message_id", "provider_internal_id", "data_response"}
|
|
}
|
|
)
|
|
warning = smtp_email_delivery_warning()
|
|
if warning:
|
|
result["warning"] = warning
|
|
return result
|
|
|
|
|
|
async def send_password_reset_email(
|
|
*,
|
|
recipient_email: str,
|
|
username: str,
|
|
token: str,
|
|
expires_at: str,
|
|
auth_provider: str,
|
|
) -> Dict[str, str]:
|
|
ready, detail = smtp_email_config_ready()
|
|
if not ready:
|
|
raise RuntimeError(detail)
|
|
|
|
resolved_email = _normalize_email(recipient_email)
|
|
if not resolved_email:
|
|
raise RuntimeError("No valid recipient email is available for password reset.")
|
|
|
|
app_url = _build_default_base_url()
|
|
reset_url = f"{app_url}/reset-password?token={token}"
|
|
provider_label = "Jellyfin, Seerr, and Magent" if auth_provider == "jellyfin" else "Magent"
|
|
subject = f"{env_settings.app_name} password reset"
|
|
body_text = (
|
|
f"A password reset was requested for {username}.\n\n"
|
|
f"This link will reset the password used for {provider_label}.\n"
|
|
f"Reset link: {reset_url}\n"
|
|
f"Expires: {expires_at}\n\n"
|
|
"If you did not request this reset, you can ignore this email.\n"
|
|
)
|
|
body_html = _wrap_email_html(
|
|
app_name=env_settings.app_name,
|
|
app_url=app_url,
|
|
build_number=BUILD_NUMBER,
|
|
title="Reset your password",
|
|
subtitle=f"This will update the credentials used for {provider_label}.",
|
|
tone="brand",
|
|
body_html=(
|
|
f"<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
|
|
f"A password reset was requested for <strong>{html.escape(username)}</strong>."
|
|
"</div>"
|
|
+ _build_email_stat_grid(
|
|
[
|
|
_build_email_stat_card("Account", username),
|
|
_build_email_stat_card("Expires", expires_at),
|
|
_build_email_stat_card("Credentials updated", provider_label),
|
|
_build_email_stat_card("Delivery target", resolved_email),
|
|
]
|
|
)
|
|
+ _build_email_panel(
|
|
"What will be updated",
|
|
f"This reset will update the password used for <strong>{html.escape(provider_label)}</strong>.",
|
|
variant="brand",
|
|
)
|
|
+ _build_email_panel(
|
|
"What happens next",
|
|
_build_email_list(
|
|
[
|
|
"Open the reset link and choose a new password.",
|
|
"Complete the form before the expiry time shown above.",
|
|
"Use the new password the next time you sign in.",
|
|
],
|
|
ordered=True,
|
|
),
|
|
variant="neutral",
|
|
)
|
|
+ _build_email_panel(
|
|
"Safety note",
|
|
"If you did not request this reset, ignore this email. No changes will be applied until the reset link is opened and completed.",
|
|
variant="warning",
|
|
)
|
|
),
|
|
primary_label="Reset password",
|
|
primary_url=reset_url,
|
|
secondary_label="Open Magent",
|
|
secondary_url=app_url,
|
|
footer_note="Password reset email generated by Magent.",
|
|
)
|
|
|
|
receipt = await asyncio.to_thread(
|
|
_send_email_sync,
|
|
recipient_email=resolved_email,
|
|
subject=subject,
|
|
body_text=body_text,
|
|
body_html=body_html,
|
|
)
|
|
logger.info(
|
|
"Password reset email sent: username=%s recipient=%s provider=%s",
|
|
username,
|
|
resolved_email,
|
|
auth_provider,
|
|
)
|
|
result = {
|
|
"recipient_email": resolved_email,
|
|
"subject": subject,
|
|
"reset_url": reset_url,
|
|
**{
|
|
key: value
|
|
for key, value in receipt.items()
|
|
if key in {"provider_message_id", "provider_internal_id", "data_response"}
|
|
},
|
|
}
|
|
warning = smtp_email_delivery_warning()
|
|
if warning:
|
|
result["warning"] = warning
|
|
return result
|