from __future__ import annotations
import asyncio
import html
import json
import logging
import re
import smtplib
from email.generator import BytesGenerator
from email.message import EmailMessage
from email.utils import formataddr
from io import BytesIO
from typing import Any, Dict, Optional
from ..build_info import BUILD_NUMBER
from ..config import settings as env_settings
from ..db import delete_setting, get_setting, set_setting
from ..runtime import get_runtime_settings
logger = logging.getLogger(__name__)
TEMPLATE_SETTING_PREFIX = "invite_email_template_"
TEMPLATE_KEYS = ("invited", "welcome", "warning", "banned")
EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PLACEHOLDER_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_]+)\s*}}")
EXCHANGE_MESSAGE_ID_PATTERN = re.compile(r"<([^>]+)>")
EXCHANGE_INTERNAL_ID_PATTERN = re.compile(r"\[InternalId=([^\],]+)")
TEMPLATE_METADATA: Dict[str, Dict[str, Any]] = {
"invited": {
"label": "You have been invited",
"description": "Sent when an invite link is created and emailed to a recipient.",
},
"welcome": {
"label": "Welcome / How it works",
"description": "Sent after an invited user completes signup.",
},
"warning": {
"label": "Warning",
"description": "Manual warning template for account or behavior notices.",
},
"banned": {
"label": "Banned",
"description": "Sent when an account is banned or removed.",
},
}
TEMPLATE_PLACEHOLDERS = [
"app_name",
"app_url",
"build_number",
"how_it_works_url",
"invite_code",
"invite_description",
"invite_expires_at",
"invite_label",
"invite_link",
"invite_remaining_uses",
"inviter_username",
"message",
"reason",
"recipient_email",
"role",
"username",
]
EMAIL_TAGLINE = "Find and fix media requests fast."
EMAIL_TONE_STYLES: Dict[str, Dict[str, str]] = {
"brand": {
"chip_bg": "rgba(255, 107, 43, 0.16)",
"chip_border": "rgba(255, 107, 43, 0.38)",
"chip_text": "#ffd2bf",
"accent_a": "#ff6b2b",
"accent_b": "#1c6bff",
},
"success": {
"chip_bg": "rgba(34, 197, 94, 0.16)",
"chip_border": "rgba(34, 197, 94, 0.38)",
"chip_text": "#c7f9d7",
"accent_a": "#22c55e",
"accent_b": "#1c6bff",
},
"warning": {
"chip_bg": "rgba(251, 146, 60, 0.16)",
"chip_border": "rgba(251, 146, 60, 0.38)",
"chip_text": "#ffe0ba",
"accent_a": "#fb923c",
"accent_b": "#ff6b2b",
},
"danger": {
"chip_bg": "rgba(248, 113, 113, 0.16)",
"chip_border": "rgba(248, 113, 113, 0.38)",
"chip_text": "#ffd0d0",
"accent_a": "#ef4444",
"accent_b": "#ff6b2b",
},
}
TEMPLATE_PRESENTATION: Dict[str, Dict[str, str]] = {
"invited": {
"tone": "brand",
"title": "You have been invited",
"subtitle": "A new account invitation is ready for you.",
"primary_label": "Accept invite",
"primary_url_key": "invite_link",
"secondary_label": "How it works",
"secondary_url_key": "how_it_works_url",
},
"welcome": {
"tone": "success",
"title": "Welcome to Magent",
"subtitle": "Your account is ready and synced.",
"primary_label": "Open Magent",
"primary_url_key": "app_url",
"secondary_label": "How it works",
"secondary_url_key": "how_it_works_url",
},
"warning": {
"tone": "warning",
"title": "Account warning",
"subtitle": "Please review the note below.",
"primary_label": "Open Magent",
"primary_url_key": "app_url",
"secondary_label": "How it works",
"secondary_url_key": "how_it_works_url",
},
"banned": {
"tone": "danger",
"title": "Account status changed",
"subtitle": "Your account has been restricted or removed.",
"primary_label": "How it works",
"primary_url_key": "how_it_works_url",
"secondary_label": "",
"secondary_url_key": "",
},
}
DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
"invited": {
"subject": "{{app_name}} invite for {{recipient_email}}",
"body_text": (
"You have been invited to {{app_name}}.\n\n"
"Invite code: {{invite_code}}\n"
"Signup link: {{invite_link}}\n"
"Invited by: {{inviter_username}}\n"
"Invite label: {{invite_label}}\n"
"Expires: {{invite_expires_at}}\n"
"Remaining uses: {{invite_remaining_uses}}\n\n"
"{{invite_description}}\n\n"
"{{message}}\n\n"
"How it works: {{how_it_works_url}}\n"
"Build: {{build_number}}\n"
),
"body_html": (
"
"
"A new invitation has been prepared for {{recipient_email}}. Use the details below to sign up."
"
"
""
""
"| "
" Invite code "
"{{invite_code}} "
" | "
""
" Invited by "
"{{inviter_username}} "
" | "
"
"
""
"| "
" Invite label "
"{{invite_label}} "
" | "
""
" Access window "
"{{invite_expires_at}} "
"Remaining uses: {{invite_remaining_uses}} "
" | "
"
"
"
"
"{{invite_description}}
"
"{{message}}
"
),
},
"welcome": {
"subject": "Welcome to {{app_name}}",
"body_text": (
"Welcome to {{app_name}}, {{username}}.\n\n"
"Your account is ready.\n"
"Open: {{app_url}}\n"
"How it works: {{how_it_works_url}}\n"
"Role: {{role}}\n\n"
"{{message}}\n"
),
"body_html": (
""
"Your account is live and ready to use. Everything below mirrors the current site behavior."
"
"
""
""
"| "
" Username "
"{{username}} "
" | "
""
" Role "
"{{role}} "
" | "
"
"
"
"
""
"
What to do next
"
"
"
"- Open Magent and sign in using your shared credentials.
"
"- Search or review requests without refreshing every page.
"
"- Use the invite tools in your profile if your account allows it.
"
"
"
"
"
"{{message}}
"
),
},
"warning": {
"subject": "{{app_name}} account warning",
"body_text": (
"Hello {{username}},\n\n"
"This is a warning regarding your {{app_name}} account.\n\n"
"Reason: {{reason}}\n\n"
"{{message}}\n\n"
"If you need help, contact the admin.\n"
),
"body_html": (
""
"Please review this account notice carefully. This message was sent by an administrator."
"
"
""
"
Reason
"
"
{{reason}}
"
"
"
"{{message}}
"
"If you need help or think this was sent in error, contact the site administrator.
"
),
},
"banned": {
"subject": "{{app_name}} account status changed",
"body_text": (
"Hello {{username}},\n\n"
"Your {{app_name}} account has been banned or removed.\n\n"
"Reason: {{reason}}\n\n"
"{{message}}\n"
),
"body_html": (
""
"Your account access has changed. Review the details below."
"
"
""
""
"| "
" Reason "
"{{reason}} "
" | "
"
"
"
"
"{{message}}
"
),
},
}
def _template_setting_key(template_key: str) -> str:
return f"{TEMPLATE_SETTING_PREFIX}{template_key}"
def _is_valid_email(value: object) -> bool:
if not isinstance(value, str):
return False
candidate = value.strip()
if not candidate:
return False
return bool(EMAIL_PATTERN.match(candidate))
def _normalize_email(value: object) -> Optional[str]:
if not _is_valid_email(value):
return None
return str(value).strip()
def _normalize_display_text(value: object, fallback: str = "") -> str:
if value is None:
return fallback
if isinstance(value, str):
trimmed = value.strip()
return trimmed if trimmed else fallback
return str(value)
def _template_context_value(value: object, fallback: str = "") -> str:
if value is None:
return fallback
if isinstance(value, str):
return value.strip()
return str(value)
def _safe_template_context(context: Dict[str, object]) -> Dict[str, str]:
safe: Dict[str, str] = {}
for key in TEMPLATE_PLACEHOLDERS:
safe[key] = _template_context_value(context.get(key), "")
return safe
def _render_template_string(template: str, context: Dict[str, str], *, escape_html: bool = False) -> str:
if not isinstance(template, str):
return ""
def _replace(match: re.Match[str]) -> str:
key = match.group(1)
value = context.get(key, "")
return html.escape(value) if escape_html else value
return PLACEHOLDER_PATTERN.sub(_replace, template)
def _strip_html_for_text(value: str) -> str:
text = re.sub(r"
", "\n", value, flags=re.IGNORECASE)
text = re.sub(r"", "\n\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text).strip()
def _build_default_base_url() -> str:
runtime = get_runtime_settings()
for candidate in (
runtime.magent_application_url,
runtime.magent_proxy_base_url,
env_settings.cors_allow_origin,
):
normalized = _normalize_display_text(candidate)
if normalized:
return normalized.rstrip("/")
port = int(getattr(runtime, "magent_application_port", 3000) or 3000)
return f"http://localhost:{port}"
def _looks_like_full_html_document(value: str) -> bool:
probe = value.lstrip().lower()
return probe.startswith(" str:
background = "linear-gradient(135deg, #ff6b2b 0%, #1c6bff 100%)" if primary else "#151c2d"
border = "1px solid rgba(59, 130, 246, 0.32)" if primary else "1px solid rgba(255, 255, 255, 0.12)"
color = "#ffffff"
return (
f"{html.escape(label)}"
)
def _wrap_email_html(
*,
app_name: str,
app_url: str,
build_number: str,
title: str,
subtitle: str,
tone: str,
body_html: str,
primary_label: str = "",
primary_url: str = "",
secondary_label: str = "",
secondary_url: str = "",
footer_note: str = "",
) -> str:
styles = EMAIL_TONE_STYLES.get(tone, EMAIL_TONE_STYLES["brand"])
logo_url = ""
if app_url.lower().startswith("http://") or app_url.lower().startswith("https://"):
logo_url = f"{app_url.rstrip('/')}/branding/logo.png"
actions = []
if primary_label and primary_url:
actions.append(_build_email_action_button(primary_label, primary_url, primary=True))
if secondary_label and secondary_url:
actions.append(_build_email_action_button(secondary_label, secondary_url, primary=False))
actions_html = "".join(actions)
footer = footer_note or "This email was generated automatically by Magent."
logo_block = (
f"
"
if logo_url
else (
"M
"
)
)
return (
""
""
""
f"{html.escape(title)} - {html.escape(subtitle)}"
"
"
""
""
""
""
f""
" "
""
f"| {logo_block} | "
""
f" {html.escape(app_name)} "
f"{html.escape(title)} "
f"{html.escape(subtitle or EMAIL_TAGLINE)} "
" | "
" "
" "
f" "
f" "
f"{html.escape(EMAIL_TAGLINE)} "
f" {body_html} "
f" {actions_html} "
" "
" "
""
f"| {html.escape(footer)} | "
f"Build {html.escape(build_number)} | "
" "
" "
" "
" "
" | "
" |
"
""
)
def build_invite_email_context(
*,
invite: Optional[Dict[str, Any]] = None,
user: Optional[Dict[str, Any]] = None,
recipient_email: Optional[str] = None,
message: Optional[str] = None,
reason: Optional[str] = None,
overrides: Optional[Dict[str, object]] = None,
) -> Dict[str, str]:
app_url = _build_default_base_url()
invite_code = _normalize_display_text(invite.get("code") if invite else None, "Not set")
invite_link = f"{app_url}/signup?code={invite_code}" if invite_code != "Not set" else f"{app_url}/signup"
remaining_uses = invite.get("remaining_uses") if invite else None
resolved_recipient = _normalize_email(recipient_email)
if not resolved_recipient and invite:
resolved_recipient = _normalize_email(invite.get("recipient_email"))
if not resolved_recipient and user:
resolved_recipient = resolve_user_delivery_email(user)
context: Dict[str, object] = {
"app_name": env_settings.app_name,
"app_url": app_url,
"build_number": BUILD_NUMBER,
"how_it_works_url": f"{app_url}/how-it-works",
"invite_code": invite_code,
"invite_description": _normalize_display_text(invite.get("description") if invite else None, "No extra details."),
"invite_expires_at": _normalize_display_text(invite.get("expires_at") if invite else None, "Never"),
"invite_label": _normalize_display_text(invite.get("label") if invite else None, "No label"),
"invite_link": invite_link,
"invite_remaining_uses": (
"Unlimited" if remaining_uses in (None, "") else _normalize_display_text(remaining_uses)
),
"inviter_username": _normalize_display_text(
invite.get("created_by") if invite else (user.get("username") if user else None),
"Admin",
),
"message": _normalize_display_text(message, ""),
"reason": _normalize_display_text(reason, "Not specified"),
"recipient_email": _normalize_display_text(resolved_recipient, "No email supplied"),
"role": _normalize_display_text(user.get("role") if user else None, "user"),
"username": _normalize_display_text(user.get("username") if user else None, "there"),
}
if isinstance(overrides, dict):
context.update(overrides)
return _safe_template_context(context)
def get_invite_email_templates() -> Dict[str, Dict[str, Any]]:
templates: Dict[str, Dict[str, Any]] = {}
for template_key in TEMPLATE_KEYS:
template = dict(DEFAULT_TEMPLATES[template_key])
raw_value = get_setting(_template_setting_key(template_key))
if raw_value:
try:
stored = json.loads(raw_value)
except (TypeError, json.JSONDecodeError):
stored = {}
if isinstance(stored, dict):
for field in ("subject", "body_text", "body_html"):
if isinstance(stored.get(field), str):
template[field] = stored[field]
templates[template_key] = {
"key": template_key,
"label": TEMPLATE_METADATA[template_key]["label"],
"description": TEMPLATE_METADATA[template_key]["description"],
"placeholders": TEMPLATE_PLACEHOLDERS,
**template,
}
return templates
def get_invite_email_template(template_key: str) -> Dict[str, Any]:
if template_key not in TEMPLATE_KEYS:
raise ValueError(f"Unknown email template: {template_key}")
return get_invite_email_templates()[template_key]
def save_invite_email_template(
template_key: str,
*,
subject: str,
body_text: str,
body_html: str,
) -> Dict[str, Any]:
if template_key not in TEMPLATE_KEYS:
raise ValueError(f"Unknown email template: {template_key}")
payload = {
"subject": subject,
"body_text": body_text,
"body_html": body_html,
}
set_setting(_template_setting_key(template_key), json.dumps(payload))
return get_invite_email_template(template_key)
def reset_invite_email_template(template_key: str) -> Dict[str, Any]:
if template_key not in TEMPLATE_KEYS:
raise ValueError(f"Unknown email template: {template_key}")
delete_setting(_template_setting_key(template_key))
return get_invite_email_template(template_key)
def render_invite_email_template(
template_key: str,
*,
invite: Optional[Dict[str, Any]] = None,
user: Optional[Dict[str, Any]] = None,
recipient_email: Optional[str] = None,
message: Optional[str] = None,
reason: Optional[str] = None,
overrides: Optional[Dict[str, object]] = None,
) -> Dict[str, str]:
template = get_invite_email_template(template_key)
context = build_invite_email_context(
invite=invite,
user=user,
recipient_email=recipient_email,
message=message,
reason=reason,
overrides=overrides,
)
raw_body_html = _render_template_string(template["body_html"], context, escape_html=True)
body_text = _render_template_string(template["body_text"], context, escape_html=False)
if not body_text.strip() and raw_body_html.strip():
body_text = _strip_html_for_text(raw_body_html)
subject = _render_template_string(template["subject"], context, escape_html=False)
presentation = TEMPLATE_PRESENTATION.get(template_key, TEMPLATE_PRESENTATION["invited"])
primary_url = _normalize_display_text(context.get(presentation["primary_url_key"], ""))
secondary_url = _normalize_display_text(context.get(presentation["secondary_url_key"], ""))
if _looks_like_full_html_document(raw_body_html):
body_html = raw_body_html.strip()
else:
body_html = _wrap_email_html(
app_name=_normalize_display_text(context.get("app_name"), env_settings.app_name),
app_url=_normalize_display_text(context.get("app_url"), _build_default_base_url()),
build_number=_normalize_display_text(context.get("build_number"), BUILD_NUMBER),
title=_normalize_display_text(context.get("title"), presentation["title"]),
subtitle=_normalize_display_text(context.get("subtitle"), presentation["subtitle"]),
tone=_normalize_display_text(context.get("tone"), presentation["tone"]),
body_html=raw_body_html.strip(),
primary_label=_normalize_display_text(
context.get("primary_label"), presentation["primary_label"]
),
primary_url=primary_url,
secondary_label=_normalize_display_text(
context.get("secondary_label"), presentation["secondary_label"]
),
secondary_url=secondary_url,
footer_note=_normalize_display_text(context.get("footer_note"), ""),
).strip()
return {
"subject": subject.strip(),
"body_text": body_text.strip(),
"body_html": body_html.strip(),
}
def resolve_user_delivery_email(user: Optional[Dict[str, Any]], invite: Optional[Dict[str, Any]] = None) -> Optional[str]:
if not isinstance(user, dict):
return _normalize_email(invite.get("recipient_email") if isinstance(invite, dict) else None)
username_email = _normalize_email(user.get("username"))
if username_email:
return username_email
if isinstance(invite, dict):
invite_email = _normalize_email(invite.get("recipient_email"))
if invite_email:
return invite_email
return None
def smtp_email_config_ready() -> tuple[bool, str]:
runtime = get_runtime_settings()
if not runtime.magent_notify_enabled:
return False, "Notifications are disabled."
if not runtime.magent_notify_email_enabled:
return False, "Email notifications are disabled."
if not _normalize_display_text(runtime.magent_notify_email_smtp_host):
return False, "SMTP host is not configured."
if not _normalize_email(runtime.magent_notify_email_from_address):
return False, "From email address is not configured."
return True, "ok"
def smtp_email_delivery_warning() -> Optional[str]:
runtime = get_runtime_settings()
host = _normalize_display_text(runtime.magent_notify_email_smtp_host).lower()
username = _normalize_display_text(runtime.magent_notify_email_smtp_username)
password = _normalize_display_text(runtime.magent_notify_email_smtp_password)
if host.endswith(".mail.protection.outlook.com") and not (username and password):
return (
"Unauthenticated Microsoft 365 relay mode is configured. SMTP acceptance does not "
"confirm mailbox delivery. For reliable delivery, use smtp.office365.com:587 with "
"SMTP credentials or configure a verified Exchange relay connector."
)
return None
def _flatten_message(message: EmailMessage) -> bytes:
buffer = BytesIO()
BytesGenerator(buffer).flatten(message)
return buffer.getvalue()
def _decode_smtp_message(value: bytes | str | None) -> str:
if value is None:
return ""
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
return str(value)
def _parse_exchange_receipt(value: bytes | str | None) -> Dict[str, str]:
message = _decode_smtp_message(value)
receipt: Dict[str, str] = {"raw": message}
message_id_match = EXCHANGE_MESSAGE_ID_PATTERN.search(message)
internal_id_match = EXCHANGE_INTERNAL_ID_PATTERN.search(message)
if message_id_match:
receipt["provider_message_id"] = message_id_match.group(1)
if internal_id_match:
receipt["provider_internal_id"] = internal_id_match.group(1)
return receipt
def _send_via_smtp_session(
smtp: smtplib.SMTP,
*,
from_address: str,
recipient_email: str,
message: EmailMessage,
) -> Dict[str, str]:
mail_code, mail_message = smtp.mail(from_address)
if mail_code >= 400:
raise smtplib.SMTPResponseException(mail_code, mail_message)
rcpt_code, rcpt_message = smtp.rcpt(recipient_email)
if rcpt_code >= 400:
raise smtplib.SMTPRecipientsRefused({recipient_email: (rcpt_code, rcpt_message)})
data_code, data_message = smtp.data(_flatten_message(message))
if data_code >= 400:
raise smtplib.SMTPDataError(data_code, data_message)
receipt = _parse_exchange_receipt(data_message)
receipt["mail_response"] = _decode_smtp_message(mail_message)
receipt["rcpt_response"] = _decode_smtp_message(rcpt_message)
receipt["data_response"] = _decode_smtp_message(data_message)
return receipt
def _send_email_sync(*, recipient_email: str, subject: str, body_text: str, body_html: str) -> Dict[str, str]:
runtime = get_runtime_settings()
host = _normalize_display_text(runtime.magent_notify_email_smtp_host)
port = int(runtime.magent_notify_email_smtp_port or 587)
username = _normalize_display_text(runtime.magent_notify_email_smtp_username)
password = _normalize_display_text(runtime.magent_notify_email_smtp_password)
from_address = _normalize_email(runtime.magent_notify_email_from_address)
from_name = _normalize_display_text(runtime.magent_notify_email_from_name, env_settings.app_name)
use_tls = bool(runtime.magent_notify_email_use_tls)
use_ssl = bool(runtime.magent_notify_email_use_ssl)
delivery_warning = smtp_email_delivery_warning()
if not host or not from_address:
raise RuntimeError("SMTP email settings are incomplete.")
logger.info(
"smtp send started recipient=%s from=%s host=%s port=%s tls=%s ssl=%s auth=%s subject=%s",
recipient_email,
from_address,
host,
port,
use_tls,
use_ssl,
bool(username and password),
subject,
)
if delivery_warning:
logger.warning("smtp delivery warning host=%s detail=%s", host, delivery_warning)
message = EmailMessage()
message["Subject"] = subject
message["From"] = formataddr((from_name, from_address))
message["To"] = recipient_email
message.set_content(body_text or _strip_html_for_text(body_html))
if body_html.strip():
message.add_alternative(body_html, subtype="html")
if use_ssl:
with smtplib.SMTP_SSL(host, port, timeout=20) as smtp:
logger.debug("smtp ssl connection opened host=%s port=%s", host, port)
if username and password:
smtp.login(username, password)
logger.debug("smtp login succeeded host=%s username=%s", host, username)
receipt = _send_via_smtp_session(
smtp,
from_address=from_address,
recipient_email=recipient_email,
message=message,
)
logger.info(
"smtp send accepted recipient=%s host=%s mode=ssl provider_message_id=%s provider_internal_id=%s",
recipient_email,
host,
receipt.get("provider_message_id"),
receipt.get("provider_internal_id"),
)
return receipt
with smtplib.SMTP(host, port, timeout=20) as smtp:
logger.debug("smtp connection opened host=%s port=%s", host, port)
smtp.ehlo()
if use_tls:
smtp.starttls()
smtp.ehlo()
logger.debug("smtp starttls negotiated host=%s port=%s", host, port)
if username and password:
smtp.login(username, password)
logger.debug("smtp login succeeded host=%s username=%s", host, username)
receipt = _send_via_smtp_session(
smtp,
from_address=from_address,
recipient_email=recipient_email,
message=message,
)
logger.info(
"smtp send accepted recipient=%s host=%s mode=plain provider_message_id=%s provider_internal_id=%s",
recipient_email,
host,
receipt.get("provider_message_id"),
receipt.get("provider_internal_id"),
)
return receipt
async def send_templated_email(
template_key: str,
*,
invite: Optional[Dict[str, Any]] = None,
user: Optional[Dict[str, Any]] = None,
recipient_email: Optional[str] = None,
message: Optional[str] = None,
reason: Optional[str] = None,
overrides: Optional[Dict[str, object]] = None,
) -> Dict[str, str]:
ready, detail = smtp_email_config_ready()
if not ready:
raise RuntimeError(detail)
resolved_email = _normalize_email(recipient_email)
if not resolved_email:
resolved_email = resolve_user_delivery_email(user, invite)
if not resolved_email:
raise RuntimeError("No valid recipient email is available for this action.")
rendered = render_invite_email_template(
template_key,
invite=invite,
user=user,
recipient_email=resolved_email,
message=message,
reason=reason,
overrides=overrides,
)
receipt = await asyncio.to_thread(
_send_email_sync,
recipient_email=resolved_email,
subject=rendered["subject"],
body_text=rendered["body_text"],
body_html=rendered["body_html"],
)
logger.info("Email template sent: template=%s recipient=%s", template_key, resolved_email)
return {
"recipient_email": resolved_email,
"subject": rendered["subject"],
**{
key: value
for key, value in receipt.items()
if key in {"provider_message_id", "provider_internal_id", "data_response"}
},
}
async def send_test_email(recipient_email: Optional[str] = None) -> Dict[str, str]:
ready, detail = smtp_email_config_ready()
if not ready:
raise RuntimeError(detail)
runtime = get_runtime_settings()
resolved_email = _normalize_email(recipient_email) or _normalize_email(
runtime.magent_notify_email_from_address
)
if not resolved_email:
raise RuntimeError("No valid recipient email is configured for the test message.")
application_url = _normalize_display_text(runtime.magent_application_url, "Not configured")
primary_url = application_url if application_url.lower().startswith(("http://", "https://")) else ""
subject = f"{env_settings.app_name} email test"
body_text = (
f"This is a test email from {env_settings.app_name}.\n\n"
f"Build: {BUILD_NUMBER}\n"
f"Application URL: {application_url}\n"
)
body_html = _wrap_email_html(
app_name=env_settings.app_name,
app_url=_build_default_base_url(),
build_number=BUILD_NUMBER,
title="Email delivery test",
subtitle="This confirms Magent can generate and hand off branded mail.",
tone="brand",
body_html=(
""
"This is a live test email from Magent. If this renders correctly, the HTML template shell and SMTP handoff are both working."
"
"
""
""
"| "
" Build "
f"{html.escape(BUILD_NUMBER)} "
" | "
""
" Application URL "
f"{html.escape(application_url)} "
" | "
"
"
"
"
""
"Use this test when changing SMTP settings, relay targets, or branding."
"
"
),
primary_label="Open Magent" if primary_url else "",
primary_url=primary_url,
footer_note="SMTP test email generated by Magent.",
)
receipt = await asyncio.to_thread(
_send_email_sync,
recipient_email=resolved_email,
subject=subject,
body_text=body_text,
body_html=body_html,
)
logger.info("SMTP test email sent: recipient=%s", resolved_email)
result = {"recipient_email": resolved_email, "subject": subject}
result.update(
{
key: value
for key, value in receipt.items()
if key in {"provider_message_id", "provider_internal_id", "data_response"}
}
)
warning = smtp_email_delivery_warning()
if warning:
result["warning"] = warning
return result
async def send_password_reset_email(
*,
recipient_email: str,
username: str,
token: str,
expires_at: str,
auth_provider: str,
) -> Dict[str, str]:
ready, detail = smtp_email_config_ready()
if not ready:
raise RuntimeError(detail)
resolved_email = _normalize_email(recipient_email)
if not resolved_email:
raise RuntimeError("No valid recipient email is available for password reset.")
app_url = _build_default_base_url()
reset_url = f"{app_url}/reset-password?token={token}"
provider_label = "Jellyfin, Seerr, and Magent" if auth_provider == "jellyfin" else "Magent"
subject = f"{env_settings.app_name} password reset"
body_text = (
f"A password reset was requested for {username}.\n\n"
f"This link will reset the password used for {provider_label}.\n"
f"Reset link: {reset_url}\n"
f"Expires: {expires_at}\n\n"
"If you did not request this reset, you can ignore this email.\n"
)
body_html = _wrap_email_html(
app_name=env_settings.app_name,
app_url=app_url,
build_number=BUILD_NUMBER,
title="Reset your password",
subtitle=f"This will update the credentials used for {provider_label}.",
tone="brand",
body_html=(
f""
f"A password reset was requested for {html.escape(username)}."
"
"
""
""
"| "
" Account "
f"{html.escape(username)} "
" | "
""
" Expires "
f"{html.escape(expires_at)} "
" | "
"
"
"
"
f""
f"This reset will update the password used for {html.escape(provider_label)}."
"
"
""
"If you did not request this reset, ignore this email. No changes will be applied until the reset link is opened and completed."
"
"
),
primary_label="Reset password",
primary_url=reset_url,
secondary_label="Open Magent",
secondary_url=app_url,
footer_note="Password reset email generated by Magent.",
)
receipt = await asyncio.to_thread(
_send_email_sync,
recipient_email=resolved_email,
subject=subject,
body_text=body_text,
body_html=body_html,
)
logger.info(
"Password reset email sent: username=%s recipient=%s provider=%s",
username,
resolved_email,
auth_provider,
)
result = {
"recipient_email": resolved_email,
"subject": subject,
"reset_url": reset_url,
**{
key: value
for key, value in receipt.items()
if key in {"provider_message_id", "provider_internal_id", "data_response"}
},
}
warning = smtp_email_delivery_warning()
if warning:
result["warning"] = warning
return result