Compare commits

..

5 Commits

27 changed files with 1297 additions and 279 deletions

View File

@@ -1 +1 @@
0303261629 0403261321

View File

@@ -108,6 +108,7 @@ def _load_current_user_from_token(
return { return {
"username": user["username"], "username": user["username"],
"email": user.get("email"),
"role": user["role"], "role": user["role"],
"auth_provider": user.get("auth_provider", "local"), "auth_provider": user.get("auth_provider", "local"),
"jellyseerr_user_id": user.get("jellyseerr_user_id"), "jellyseerr_user_id": user.get("jellyseerr_user_id"),

File diff suppressed because one or more lines are too long

View File

@@ -9,6 +9,9 @@ class Settings(BaseSettings):
app_name: str = "Magent" app_name: str = "Magent"
cors_allow_origin: str = "http://localhost:3000" cors_allow_origin: str = "http://localhost:3000"
sqlite_path: str = Field(default="data/magent.db", validation_alias=AliasChoices("SQLITE_PATH")) sqlite_path: str = Field(default="data/magent.db", validation_alias=AliasChoices("SQLITE_PATH"))
sqlite_journal_mode: str = Field(
default="DELETE", validation_alias=AliasChoices("SQLITE_JOURNAL_MODE")
)
jwt_secret: str = Field(default="change-me", validation_alias=AliasChoices("JWT_SECRET")) jwt_secret: str = Field(default="change-me", validation_alias=AliasChoices("JWT_SECRET"))
jwt_exp_minutes: int = Field(default=720, validation_alias=AliasChoices("JWT_EXP_MINUTES")) jwt_exp_minutes: int = Field(default=720, validation_alias=AliasChoices("JWT_EXP_MINUTES"))
api_docs_enabled: bool = Field(default=False, validation_alias=AliasChoices("API_DOCS_ENABLED")) api_docs_enabled: bool = Field(default=False, validation_alias=AliasChoices("API_DOCS_ENABLED"))
@@ -21,6 +24,15 @@ class Settings(BaseSettings):
auth_rate_limit_max_attempts_user: int = Field( auth_rate_limit_max_attempts_user: int = Field(
default=5, validation_alias=AliasChoices("AUTH_RATE_LIMIT_MAX_ATTEMPTS_USER") default=5, validation_alias=AliasChoices("AUTH_RATE_LIMIT_MAX_ATTEMPTS_USER")
) )
password_reset_rate_limit_window_seconds: int = Field(
default=300, validation_alias=AliasChoices("PASSWORD_RESET_RATE_LIMIT_WINDOW_SECONDS")
)
password_reset_rate_limit_max_attempts_ip: int = Field(
default=6, validation_alias=AliasChoices("PASSWORD_RESET_RATE_LIMIT_MAX_ATTEMPTS_IP")
)
password_reset_rate_limit_max_attempts_identifier: int = Field(
default=3, validation_alias=AliasChoices("PASSWORD_RESET_RATE_LIMIT_MAX_ATTEMPTS_IDENTIFIER")
)
admin_username: str = Field(default="admin", validation_alias=AliasChoices("ADMIN_USERNAME")) admin_username: str = Field(default="admin", validation_alias=AliasChoices("ADMIN_USERNAME"))
admin_password: str = Field(default="adminadmin", validation_alias=AliasChoices("ADMIN_PASSWORD")) admin_password: str = Field(default="adminadmin", validation_alias=AliasChoices("ADMIN_PASSWORD"))
log_level: str = Field(default="INFO", validation_alias=AliasChoices("LOG_LEVEL")) log_level: str = Field(default="INFO", validation_alias=AliasChoices("LOG_LEVEL"))

View File

@@ -32,8 +32,11 @@ def _db_path() -> str:
def _apply_connection_pragmas(conn: sqlite3.Connection) -> None: def _apply_connection_pragmas(conn: sqlite3.Connection) -> None:
journal_mode = str(getattr(settings, "sqlite_journal_mode", "DELETE") or "DELETE").strip().upper()
if journal_mode not in {"DELETE", "WAL", "TRUNCATE", "PERSIST", "MEMORY", "OFF"}:
journal_mode = "DELETE"
pragmas = ( pragmas = (
("journal_mode", "WAL"), ("journal_mode", journal_mode),
("synchronous", "NORMAL"), ("synchronous", "NORMAL"),
("temp_store", "MEMORY"), ("temp_store", "MEMORY"),
("cache_size", -SQLITE_CACHE_SIZE_KIB), ("cache_size", -SQLITE_CACHE_SIZE_KIB),
@@ -165,6 +168,15 @@ def _extract_tmdb_from_payload(payload_json: Optional[str]) -> tuple[Optional[in
return tmdb_id, media_type return tmdb_id, media_type
def _normalize_stored_email(value: Optional[Any]) -> Optional[str]:
if not isinstance(value, str):
return None
candidate = value.strip()
if not candidate or "@" not in candidate:
return None
return candidate
def init_db() -> None: def init_db() -> None:
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
@@ -197,6 +209,7 @@ def init_db() -> None:
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE, username TEXT NOT NULL UNIQUE,
email TEXT,
password_hash TEXT NOT NULL, password_hash TEXT NOT NULL,
role TEXT NOT NULL, role TEXT NOT NULL,
auth_provider TEXT NOT NULL DEFAULT 'local', auth_provider TEXT NOT NULL DEFAULT 'local',
@@ -422,6 +435,10 @@ def init_db() -> None:
ON user_activity (last_seen_at) ON user_activity (last_seen_at)
""" """
) )
try:
conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
except sqlite3.OperationalError:
pass
try: try:
conn.execute("ALTER TABLE users ADD COLUMN last_login_at TEXT") conn.execute("ALTER TABLE users ADD COLUMN last_login_at TEXT")
except sqlite3.OperationalError: except sqlite3.OperationalError:
@@ -501,6 +518,15 @@ def init_db() -> None:
) )
except sqlite3.OperationalError: except sqlite3.OperationalError:
pass pass
try:
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_email_nocase
ON users (email COLLATE NOCASE)
"""
)
except sqlite3.OperationalError:
pass
try: try:
conn.execute("ALTER TABLE requests_cache ADD COLUMN requested_by_id INTEGER") conn.execute("ALTER TABLE requests_cache ADD COLUMN requested_by_id INTEGER")
except sqlite3.OperationalError: except sqlite3.OperationalError:
@@ -625,6 +651,7 @@ def create_user(
username: str, username: str,
password: str, password: str,
role: str = "user", role: str = "user",
email: Optional[str] = None,
auth_provider: str = "local", auth_provider: str = "local",
jellyseerr_user_id: Optional[int] = None, jellyseerr_user_id: Optional[int] = None,
auto_search_enabled: bool = True, auto_search_enabled: bool = True,
@@ -635,11 +662,13 @@ def create_user(
) -> None: ) -> None:
created_at = datetime.now(timezone.utc).isoformat() created_at = datetime.now(timezone.utc).isoformat()
password_hash = hash_password(password) password_hash = hash_password(password)
normalized_email = _normalize_stored_email(email)
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
""" """
INSERT INTO users ( INSERT INTO users (
username, username,
email,
password_hash, password_hash,
role, role,
auth_provider, auth_provider,
@@ -652,10 +681,11 @@ def create_user(
invited_by_code, invited_by_code,
invited_at invited_at
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", """,
( (
username, username,
normalized_email,
password_hash, password_hash,
role, role,
auth_provider, auth_provider,
@@ -675,6 +705,7 @@ def create_user_if_missing(
username: str, username: str,
password: str, password: str,
role: str = "user", role: str = "user",
email: Optional[str] = None,
auth_provider: str = "local", auth_provider: str = "local",
jellyseerr_user_id: Optional[int] = None, jellyseerr_user_id: Optional[int] = None,
auto_search_enabled: bool = True, auto_search_enabled: bool = True,
@@ -685,11 +716,13 @@ def create_user_if_missing(
) -> bool: ) -> bool:
created_at = datetime.now(timezone.utc).isoformat() created_at = datetime.now(timezone.utc).isoformat()
password_hash = hash_password(password) password_hash = hash_password(password)
normalized_email = _normalize_stored_email(email)
with _connect() as conn: with _connect() as conn:
cursor = conn.execute( cursor = conn.execute(
""" """
INSERT OR IGNORE INTO users ( INSERT OR IGNORE INTO users (
username, username,
email,
password_hash, password_hash,
role, role,
auth_provider, auth_provider,
@@ -702,10 +735,11 @@ def create_user_if_missing(
invited_by_code, invited_by_code,
invited_at invited_at
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", """,
( (
username, username,
normalized_email,
password_hash, password_hash,
role, role,
auth_provider, auth_provider,
@@ -739,7 +773,7 @@ def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
with _connect() as conn: with _connect() as conn:
row = conn.execute( row = conn.execute(
""" """
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, SELECT id, username, email, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled, created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at jellyfin_password_hash, last_jellyfin_auth_at
@@ -753,22 +787,23 @@ def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
return { return {
"id": row[0], "id": row[0],
"username": row[1], "username": row[1],
"password_hash": row[2], "email": row[2],
"role": row[3], "password_hash": row[3],
"auth_provider": row[4], "role": row[4],
"jellyseerr_user_id": row[5], "auth_provider": row[5],
"created_at": row[6], "jellyseerr_user_id": row[6],
"last_login_at": row[7], "created_at": row[7],
"is_blocked": bool(row[8]), "last_login_at": row[8],
"auto_search_enabled": bool(row[9]), "is_blocked": bool(row[9]),
"invite_management_enabled": bool(row[10]), "auto_search_enabled": bool(row[10]),
"profile_id": row[11], "invite_management_enabled": bool(row[11]),
"expires_at": row[12], "profile_id": row[12],
"invited_by_code": row[13], "expires_at": row[13],
"invited_at": row[14], "invited_by_code": row[14],
"is_expired": _is_datetime_in_past(row[12]), "invited_at": row[15],
"jellyfin_password_hash": row[15], "is_expired": _is_datetime_in_past(row[13]),
"last_jellyfin_auth_at": row[16], "jellyfin_password_hash": row[16],
"last_jellyfin_auth_at": row[17],
} }
@@ -776,7 +811,7 @@ def get_user_by_jellyseerr_id(jellyseerr_user_id: int) -> Optional[Dict[str, Any
with _connect() as conn: with _connect() as conn:
row = conn.execute( row = conn.execute(
""" """
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, SELECT id, username, email, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled, created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at jellyfin_password_hash, last_jellyfin_auth_at
@@ -792,22 +827,23 @@ def get_user_by_jellyseerr_id(jellyseerr_user_id: int) -> Optional[Dict[str, Any
return { return {
"id": row[0], "id": row[0],
"username": row[1], "username": row[1],
"password_hash": row[2], "email": row[2],
"role": row[3], "password_hash": row[3],
"auth_provider": row[4], "role": row[4],
"jellyseerr_user_id": row[5], "auth_provider": row[5],
"created_at": row[6], "jellyseerr_user_id": row[6],
"last_login_at": row[7], "created_at": row[7],
"is_blocked": bool(row[8]), "last_login_at": row[8],
"auto_search_enabled": bool(row[9]), "is_blocked": bool(row[9]),
"invite_management_enabled": bool(row[10]), "auto_search_enabled": bool(row[10]),
"profile_id": row[11], "invite_management_enabled": bool(row[11]),
"expires_at": row[12], "profile_id": row[12],
"invited_by_code": row[13], "expires_at": row[13],
"invited_at": row[14], "invited_by_code": row[14],
"is_expired": _is_datetime_in_past(row[12]), "invited_at": row[15],
"jellyfin_password_hash": row[15], "is_expired": _is_datetime_in_past(row[13]),
"last_jellyfin_auth_at": row[16], "jellyfin_password_hash": row[16],
"last_jellyfin_auth_at": row[17],
} }
@@ -815,7 +851,7 @@ def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn: with _connect() as conn:
row = conn.execute( row = conn.execute(
""" """
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, SELECT id, username, email, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled, created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at jellyfin_password_hash, last_jellyfin_auth_at
@@ -829,29 +865,30 @@ def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
return { return {
"id": row[0], "id": row[0],
"username": row[1], "username": row[1],
"password_hash": row[2], "email": row[2],
"role": row[3], "password_hash": row[3],
"auth_provider": row[4], "role": row[4],
"jellyseerr_user_id": row[5], "auth_provider": row[5],
"created_at": row[6], "jellyseerr_user_id": row[6],
"last_login_at": row[7], "created_at": row[7],
"is_blocked": bool(row[8]), "last_login_at": row[8],
"auto_search_enabled": bool(row[9]), "is_blocked": bool(row[9]),
"invite_management_enabled": bool(row[10]), "auto_search_enabled": bool(row[10]),
"profile_id": row[11], "invite_management_enabled": bool(row[11]),
"expires_at": row[12], "profile_id": row[12],
"invited_by_code": row[13], "expires_at": row[13],
"invited_at": row[14], "invited_by_code": row[14],
"is_expired": _is_datetime_in_past(row[12]), "invited_at": row[15],
"jellyfin_password_hash": row[15], "is_expired": _is_datetime_in_past(row[13]),
"last_jellyfin_auth_at": row[16], "jellyfin_password_hash": row[16],
"last_jellyfin_auth_at": row[17],
} }
def get_all_users() -> list[Dict[str, Any]]: def get_all_users() -> list[Dict[str, Any]]:
with _connect() as conn: with _connect() as conn:
rows = conn.execute( rows = conn.execute(
""" """
SELECT id, username, role, auth_provider, jellyseerr_user_id, created_at, SELECT id, username, email, role, auth_provider, jellyseerr_user_id, created_at,
last_login_at, is_blocked, auto_search_enabled, invite_management_enabled, last_login_at, is_blocked, auto_search_enabled, invite_management_enabled,
profile_id, expires_at, invited_by_code, invited_at profile_id, expires_at, invited_by_code, invited_at
FROM users FROM users
@@ -864,19 +901,20 @@ def get_all_users() -> list[Dict[str, Any]]:
{ {
"id": row[0], "id": row[0],
"username": row[1], "username": row[1],
"role": row[2], "email": row[2],
"auth_provider": row[3], "role": row[3],
"jellyseerr_user_id": row[4], "auth_provider": row[4],
"created_at": row[5], "jellyseerr_user_id": row[5],
"last_login_at": row[6], "created_at": row[6],
"is_blocked": bool(row[7]), "last_login_at": row[7],
"auto_search_enabled": bool(row[8]), "is_blocked": bool(row[8]),
"invite_management_enabled": bool(row[9]), "auto_search_enabled": bool(row[9]),
"profile_id": row[10], "invite_management_enabled": bool(row[10]),
"expires_at": row[11], "profile_id": row[11],
"invited_by_code": row[12], "expires_at": row[12],
"invited_at": row[13], "invited_by_code": row[13],
"is_expired": _is_datetime_in_past(row[11]), "invited_at": row[14],
"is_expired": _is_datetime_in_past(row[12]),
} }
) )
# Admin user management uses Jellyfin as the source of truth for non-admin # Admin user management uses Jellyfin as the source of truth for non-admin
@@ -945,7 +983,7 @@ def set_user_jellyseerr_id(username: str, jellyseerr_user_id: Optional[int]) ->
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
""" """
UPDATE users SET jellyseerr_user_id = ? WHERE username = ? UPDATE users SET jellyseerr_user_id = ? WHERE username = ? COLLATE NOCASE
""", """,
(jellyseerr_user_id, username), (jellyseerr_user_id, username),
) )
@@ -956,7 +994,7 @@ def set_user_auth_provider(username: str, auth_provider: str) -> None:
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
""" """
UPDATE users SET auth_provider = ? WHERE username = ? UPDATE users SET auth_provider = ? WHERE username = ? COLLATE NOCASE
""", """,
(provider, username), (provider, username),
) )
@@ -967,7 +1005,7 @@ def set_last_login(username: str) -> None:
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
""" """
UPDATE users SET last_login_at = ? WHERE username = ? UPDATE users SET last_login_at = ? WHERE username = ? COLLATE NOCASE
""", """,
(timestamp, username), (timestamp, username),
) )
@@ -1026,7 +1064,7 @@ def set_user_role(username: str, role: str) -> None:
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
""" """
UPDATE users SET role = ? WHERE username = ? UPDATE users SET role = ? WHERE username = ? COLLATE NOCASE
""", """,
(role, username), (role, username),
) )
@@ -1037,7 +1075,7 @@ def set_user_auto_search_enabled(username: str, enabled: bool) -> None:
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
""" """
UPDATE users SET auto_search_enabled = ? WHERE username = ? UPDATE users SET auto_search_enabled = ? WHERE username = ? COLLATE NOCASE
""", """,
(1 if enabled else 0, username), (1 if enabled else 0, username),
) )
@@ -1480,7 +1518,7 @@ def get_users_by_username_ci(username: str) -> list[Dict[str, Any]]:
with _connect() as conn: with _connect() as conn:
rows = conn.execute( rows = conn.execute(
""" """
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, SELECT id, username, email, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled, created_at, last_login_at, is_blocked, auto_search_enabled,
invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at, invite_management_enabled, profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at jellyfin_password_hash, last_jellyfin_auth_at
@@ -1498,33 +1536,53 @@ def get_users_by_username_ci(username: str) -> list[Dict[str, Any]]:
{ {
"id": row[0], "id": row[0],
"username": row[1], "username": row[1],
"password_hash": row[2], "email": row[2],
"role": row[3], "password_hash": row[3],
"auth_provider": row[4], "role": row[4],
"jellyseerr_user_id": row[5], "auth_provider": row[5],
"created_at": row[6], "jellyseerr_user_id": row[6],
"last_login_at": row[7], "created_at": row[7],
"is_blocked": bool(row[8]), "last_login_at": row[8],
"auto_search_enabled": bool(row[9]), "is_blocked": bool(row[9]),
"invite_management_enabled": bool(row[10]), "auto_search_enabled": bool(row[10]),
"profile_id": row[11], "invite_management_enabled": bool(row[11]),
"expires_at": row[12], "profile_id": row[12],
"invited_by_code": row[13], "expires_at": row[13],
"invited_at": row[14], "invited_by_code": row[14],
"is_expired": _is_datetime_in_past(row[12]), "invited_at": row[15],
"jellyfin_password_hash": row[15], "is_expired": _is_datetime_in_past(row[13]),
"last_jellyfin_auth_at": row[16], "jellyfin_password_hash": row[16],
"last_jellyfin_auth_at": row[17],
} }
) )
return results return results
def set_user_email(username: str, email: Optional[str]) -> bool:
normalized_email = _normalize_stored_email(email)
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE users
SET email = ?
WHERE username = ? COLLATE NOCASE
""",
(normalized_email, username),
)
updated = cursor.rowcount > 0
if updated:
logger.info("user email updated username=%s email=%s", username, normalized_email)
else:
logger.debug("user email update skipped username=%s", username)
return updated
def set_user_password(username: str, password: str) -> None: def set_user_password(username: str, password: str) -> None:
password_hash = hash_password(password) password_hash = hash_password(password)
with _connect() as conn: with _connect() as conn:
conn.execute( conn.execute(
""" """
UPDATE users SET password_hash = ? WHERE username = ? UPDATE users SET password_hash = ? WHERE username = ? COLLATE NOCASE
""", """,
(password_hash, username), (password_hash, username),
) )

View File

@@ -163,6 +163,21 @@ def _launch_background_task(name: str, coroutine_factory: Callable[[], Awaitable
_background_tasks.append(task) _background_tasks.append(task)
def _log_security_configuration_warnings() -> None:
if str(settings.jwt_secret or "").strip() == "change-me":
logger.warning(
"security configuration warning: JWT_SECRET is still set to the default value"
)
if str(settings.admin_password or "") == "adminadmin":
logger.warning(
"security configuration warning: ADMIN_PASSWORD is still set to the bootstrap default"
)
if bool(settings.api_docs_enabled):
logger.warning(
"security configuration warning: API docs are enabled; disable API_DOCS_ENABLED outside controlled environments"
)
@app.on_event("startup") @app.on_event("startup")
async def startup() -> None: async def startup() -> None:
configure_logging( configure_logging(
@@ -174,6 +189,7 @@ async def startup() -> None:
log_background_sync_level=settings.log_background_sync_level, log_background_sync_level=settings.log_background_sync_level,
) )
logger.info("startup begin app=%s build=%s", settings.app_name, settings.site_build_number) logger.info("startup begin app=%s build=%s", settings.app_name, settings.site_build_number)
_log_security_configuration_warnings()
init_db() init_db()
runtime = get_runtime_settings() runtime = get_runtime_settings()
configure_logging( configure_logging(

View File

@@ -41,6 +41,7 @@ from ..db import (
delete_user_activity_by_username, delete_user_activity_by_username,
set_user_auto_search_enabled, set_user_auto_search_enabled,
set_auto_search_enabled_for_non_admin_users, set_auto_search_enabled_for_non_admin_users,
set_user_email,
set_user_invite_management_enabled, set_user_invite_management_enabled,
set_invite_management_enabled_for_non_admin_users, set_invite_management_enabled_for_non_admin_users,
set_user_profile_id, set_user_profile_id,
@@ -78,6 +79,8 @@ from ..clients.jellyseerr import JellyseerrClient
from ..services.jellyfin_sync import sync_jellyfin_users from ..services.jellyfin_sync import sync_jellyfin_users
from ..services.user_cache import ( from ..services.user_cache import (
build_jellyseerr_candidate_map, build_jellyseerr_candidate_map,
extract_jellyseerr_user_email,
find_matching_jellyseerr_user,
get_cached_jellyfin_users, get_cached_jellyfin_users,
get_cached_jellyseerr_users, get_cached_jellyseerr_users,
match_jellyseerr_user_id, match_jellyseerr_user_id,
@@ -85,9 +88,11 @@ from ..services.user_cache import (
save_jellyseerr_users_cache, save_jellyseerr_users_cache,
clear_user_import_caches, clear_user_import_caches,
) )
from ..security import validate_password_policy
from ..services.invite_email import ( from ..services.invite_email import (
TEMPLATE_KEYS as INVITE_EMAIL_TEMPLATE_KEYS, TEMPLATE_KEYS as INVITE_EMAIL_TEMPLATE_KEYS,
get_invite_email_templates, get_invite_email_templates,
normalize_delivery_email,
reset_invite_email_template, reset_invite_email_template,
save_invite_email_template, save_invite_email_template,
send_test_email, send_test_email,
@@ -106,6 +111,16 @@ events_router = APIRouter(prefix="/admin/events", tags=["admin"])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SELF_SERVICE_INVITE_MASTER_ID_KEY = "self_service_invite_master_id" SELF_SERVICE_INVITE_MASTER_ID_KEY = "self_service_invite_master_id"
def _require_recipient_email(value: object) -> str:
normalized = normalize_delivery_email(value)
if normalized:
return normalized
raise HTTPException(
status_code=400,
detail="recipient_email is required and must be a valid email address",
)
SENSITIVE_KEYS = { SENSITIVE_KEYS = {
"magent_ssl_certificate_pem", "magent_ssl_certificate_pem",
"magent_ssl_private_key_pem", "magent_ssl_private_key_pem",
@@ -820,8 +835,12 @@ async def jellyseerr_users_sync() -> Dict[str, Any]:
continue continue
username = user.get("username") or "" username = user.get("username") or ""
matched_id = match_jellyseerr_user_id(username, candidate_to_id) matched_id = match_jellyseerr_user_id(username, candidate_to_id)
matched_seerr_user = find_matching_jellyseerr_user(username, jellyseerr_users)
matched_email = extract_jellyseerr_user_email(matched_seerr_user)
if matched_id is not None: if matched_id is not None:
set_user_jellyseerr_id(username, matched_id) set_user_jellyseerr_id(username, matched_id)
if matched_email:
set_user_email(username, matched_email)
updated += 1 updated += 1
else: else:
skipped += 1 skipped += 1
@@ -858,10 +877,12 @@ async def jellyseerr_users_resync() -> Dict[str, Any]:
username = _pick_jellyseerr_username(user) username = _pick_jellyseerr_username(user)
if not username: if not username:
continue continue
email = extract_jellyseerr_user_email(user)
created = create_user_if_missing( created = create_user_if_missing(
username, username,
"jellyseerr-user", "jellyseerr-user",
role="user", role="user",
email=email,
auth_provider="jellyseerr", auth_provider="jellyseerr",
jellyseerr_user_id=user_id, jellyseerr_user_id=user_id,
) )
@@ -869,6 +890,8 @@ async def jellyseerr_users_resync() -> Dict[str, Any]:
imported += 1 imported += 1
else: else:
set_user_jellyseerr_id(username, user_id) set_user_jellyseerr_id(username, user_id)
if email:
set_user_email(username, email)
return {"status": "ok", "imported": imported, "cleared": cleared} return {"status": "ok", "imported": imported, "cleared": cleared}
@router.post("/requests/sync") @router.post("/requests/sync")
@@ -1458,12 +1481,15 @@ async def update_users_expiry_bulk(payload: Dict[str, Any]) -> Dict[str, Any]:
@router.post("/users/{username}/password") @router.post("/users/{username}/password")
async def update_user_password(username: str, payload: Dict[str, Any]) -> Dict[str, Any]: async def update_user_password(username: str, payload: Dict[str, Any]) -> Dict[str, Any]:
new_password = payload.get("password") if isinstance(payload, dict) else None new_password = payload.get("password") if isinstance(payload, dict) else None
if not isinstance(new_password, str) or len(new_password.strip()) < 8: if not isinstance(new_password, str):
raise HTTPException(status_code=400, detail="Password must be at least 8 characters.") raise HTTPException(status_code=400, detail="Invalid payload")
try:
new_password_clean = validate_password_policy(new_password)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
user = get_user_by_username(username) user = get_user_by_username(username)
if not user: if not user:
raise HTTPException(status_code=404, detail="User not found") raise HTTPException(status_code=404, detail="User not found")
new_password_clean = new_password.strip()
user = normalize_user_auth_provider(user) user = normalize_user_auth_provider(user)
auth_provider = resolve_user_auth_provider(user) auth_provider = resolve_user_auth_provider(user)
if auth_provider == "local": if auth_provider == "local":
@@ -1775,7 +1801,7 @@ async def send_invite_email(payload: Dict[str, Any]) -> Dict[str, Any]:
if invite is None: if invite is None:
invite = _resolve_user_invite(user) invite = _resolve_user_invite(user)
recipient_email = _normalize_optional_text(payload.get("recipient_email")) recipient_email = _require_recipient_email(payload.get("recipient_email"))
message = _normalize_optional_text(payload.get("message")) message = _normalize_optional_text(payload.get("message"))
reason = _normalize_optional_text(payload.get("reason")) reason = _normalize_optional_text(payload.get("reason"))
@@ -1825,7 +1851,7 @@ async def create_invite(payload: Dict[str, Any], current_user: Dict[str, Any] =
role = _normalize_role_or_none(payload.get("role")) role = _normalize_role_or_none(payload.get("role"))
max_uses = _parse_optional_positive_int(payload.get("max_uses"), "max_uses") max_uses = _parse_optional_positive_int(payload.get("max_uses"), "max_uses")
expires_at = _parse_optional_expires_at(payload.get("expires_at")) expires_at = _parse_optional_expires_at(payload.get("expires_at"))
recipient_email = _normalize_optional_text(payload.get("recipient_email")) recipient_email = _require_recipient_email(payload.get("recipient_email"))
send_email = bool(payload.get("send_email")) send_email = bool(payload.get("send_email"))
delivery_message = _normalize_optional_text(payload.get("message")) delivery_message = _normalize_optional_text(payload.get("message"))
try: try:

View File

@@ -19,6 +19,7 @@ from ..db import (
get_users_by_username_ci, get_users_by_username_ci,
set_user_password, set_user_password,
set_user_jellyseerr_id, set_user_jellyseerr_id,
set_user_email,
set_user_auth_provider, set_user_auth_provider,
get_signup_invite_by_code, get_signup_invite_by_code,
get_signup_invite_by_id, get_signup_invite_by_id,
@@ -39,17 +40,28 @@ from ..db import (
from ..runtime import get_runtime_settings from ..runtime import get_runtime_settings
from ..clients.jellyfin import JellyfinClient from ..clients.jellyfin import JellyfinClient
from ..clients.jellyseerr import JellyseerrClient from ..clients.jellyseerr import JellyseerrClient
from ..security import create_access_token, verify_password from ..security import (
PASSWORD_POLICY_MESSAGE,
create_access_token,
validate_password_policy,
verify_password,
)
from ..security import create_stream_token from ..security import create_stream_token
from ..auth import get_current_user, normalize_user_auth_provider, resolve_user_auth_provider from ..auth import get_current_user, normalize_user_auth_provider, resolve_user_auth_provider
from ..config import settings from ..config import settings
from ..services.user_cache import ( from ..services.user_cache import (
build_jellyseerr_candidate_map, build_jellyseerr_candidate_map,
extract_jellyseerr_user_email,
find_matching_jellyseerr_user,
get_cached_jellyseerr_users, get_cached_jellyseerr_users,
match_jellyseerr_user_id, match_jellyseerr_user_id,
save_jellyfin_users_cache, save_jellyfin_users_cache,
) )
from ..services.invite_email import send_templated_email, smtp_email_config_ready from ..services.invite_email import (
normalize_delivery_email,
send_templated_email,
smtp_email_config_ready,
)
from ..services.password_reset import ( from ..services.password_reset import (
PasswordResetUnavailableError, PasswordResetUnavailableError,
apply_password_reset, apply_password_reset,
@@ -68,6 +80,19 @@ PASSWORD_RESET_GENERIC_MESSAGE = (
_LOGIN_RATE_LOCK = Lock() _LOGIN_RATE_LOCK = Lock()
_LOGIN_ATTEMPTS_BY_IP: dict[str, deque[float]] = defaultdict(deque) _LOGIN_ATTEMPTS_BY_IP: dict[str, deque[float]] = defaultdict(deque)
_LOGIN_ATTEMPTS_BY_USER: dict[str, deque[float]] = defaultdict(deque) _LOGIN_ATTEMPTS_BY_USER: dict[str, deque[float]] = defaultdict(deque)
_RESET_RATE_LOCK = Lock()
_RESET_ATTEMPTS_BY_IP: dict[str, deque[float]] = defaultdict(deque)
_RESET_ATTEMPTS_BY_IDENTIFIER: dict[str, deque[float]] = defaultdict(deque)
def _require_recipient_email(value: object) -> str:
normalized = normalize_delivery_email(value)
if normalized:
return normalized
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="recipient_email is required and must be a valid email address.",
)
def _auth_client_ip(request: Request) -> str: def _auth_client_ip(request: Request) -> str:
@@ -86,6 +111,10 @@ def _login_rate_key_user(username: str) -> str:
return (username or "").strip().lower()[:256] or "<empty>" return (username or "").strip().lower()[:256] or "<empty>"
def _password_reset_rate_key_identifier(identifier: str) -> str:
return (identifier or "").strip().lower()[:256] or "<empty>"
def _prune_attempts(bucket: deque[float], now: float, window_seconds: int) -> None: def _prune_attempts(bucket: deque[float], now: float, window_seconds: int) -> None:
cutoff = now - window_seconds cutoff = now - window_seconds
while bucket and bucket[0] < cutoff: while bucket and bucket[0] < cutoff:
@@ -171,6 +200,57 @@ def _enforce_login_rate_limit(request: Request, username: str) -> None:
) )
def _record_password_reset_attempt(request: Request, identifier: str) -> None:
now = time.monotonic()
window = max(int(settings.password_reset_rate_limit_window_seconds or 300), 1)
ip_key = _auth_client_ip(request)
identifier_key = _password_reset_rate_key_identifier(identifier)
with _RESET_RATE_LOCK:
ip_bucket = _RESET_ATTEMPTS_BY_IP[ip_key]
identifier_bucket = _RESET_ATTEMPTS_BY_IDENTIFIER[identifier_key]
_prune_attempts(ip_bucket, now, window)
_prune_attempts(identifier_bucket, now, window)
ip_bucket.append(now)
identifier_bucket.append(now)
logger.info("password reset rate event recorded identifier=%s client=%s", identifier_key, ip_key)
def _enforce_password_reset_rate_limit(request: Request, identifier: str) -> None:
now = time.monotonic()
window = max(int(settings.password_reset_rate_limit_window_seconds or 300), 1)
max_ip = max(int(settings.password_reset_rate_limit_max_attempts_ip or 6), 1)
max_identifier = max(int(settings.password_reset_rate_limit_max_attempts_identifier or 3), 1)
ip_key = _auth_client_ip(request)
identifier_key = _password_reset_rate_key_identifier(identifier)
with _RESET_RATE_LOCK:
ip_bucket = _RESET_ATTEMPTS_BY_IP[ip_key]
identifier_bucket = _RESET_ATTEMPTS_BY_IDENTIFIER[identifier_key]
_prune_attempts(ip_bucket, now, window)
_prune_attempts(identifier_bucket, now, window)
exceeded = len(ip_bucket) >= max_ip or len(identifier_bucket) >= max_identifier
retry_after = 1
if exceeded:
retry_candidates = []
if ip_bucket:
retry_candidates.append(max(1, int(window - (now - ip_bucket[0]))))
if identifier_bucket:
retry_candidates.append(max(1, int(window - (now - identifier_bucket[0]))))
if retry_candidates:
retry_after = max(retry_candidates)
if exceeded:
logger.warning(
"password reset rate limit exceeded identifier=%s client=%s retry_after=%s",
identifier_key,
ip_key,
retry_after,
)
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many password reset attempts. Try again shortly.",
headers={"Retry-After": str(retry_after)},
)
def _normalize_username(value: str) -> str: def _normalize_username(value: str) -> str:
normalized = value.strip().lower() normalized = value.strip().lower()
if "@" in normalized: if "@" in normalized:
@@ -219,6 +299,13 @@ def _extract_jellyseerr_user_id(response: dict) -> int | None:
return None return None
def _extract_jellyseerr_response_email(response: dict) -> str | None:
if not isinstance(response, dict):
return None
user_payload = response.get("user") if isinstance(response.get("user"), dict) else response
return extract_jellyseerr_user_email(user_payload)
def _extract_http_error_detail(exc: Exception) -> str: def _extract_http_error_detail(exc: Exception) -> str:
if isinstance(exc, httpx.HTTPStatusError): if isinstance(exc, httpx.HTTPStatusError):
response = exc.response response = exc.response
@@ -569,6 +656,8 @@ async def jellyfin_login(request: Request, form_data: OAuth2PasswordRequestForm
preferred_match = _pick_preferred_ci_user_match(ci_matches, username) preferred_match = _pick_preferred_ci_user_match(ci_matches, username)
canonical_username = str(preferred_match.get("username") or username) if preferred_match else username canonical_username = str(preferred_match.get("username") or username) if preferred_match else username
user = preferred_match or get_user_by_username(username) user = preferred_match or get_user_by_username(username)
matched_seerr_user = find_matching_jellyseerr_user(canonical_username, jellyseerr_users or [])
matched_email = extract_jellyseerr_user_email(matched_seerr_user)
_assert_user_can_login(user) _assert_user_can_login(user)
if user and _has_valid_jellyfin_cache(user, password): if user and _has_valid_jellyfin_cache(user, password):
token = create_access_token(canonical_username, "user") token = create_access_token(canonical_username, "user")
@@ -597,7 +686,13 @@ async def jellyfin_login(request: Request, form_data: OAuth2PasswordRequestForm
_record_login_failure(request, username) _record_login_failure(request, username)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Jellyfin credentials") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Jellyfin credentials")
if not preferred_match: if not preferred_match:
create_user_if_missing(canonical_username, "jellyfin-user", role="user", auth_provider="jellyfin") create_user_if_missing(
canonical_username,
"jellyfin-user",
role="user",
email=matched_email,
auth_provider="jellyfin",
)
elif ( elif (
user user
and str(user.get("role") or "user").strip().lower() != "admin" and str(user.get("role") or "user").strip().lower() != "admin"
@@ -605,6 +700,8 @@ async def jellyfin_login(request: Request, form_data: OAuth2PasswordRequestForm
): ):
set_user_auth_provider(canonical_username, "jellyfin") set_user_auth_provider(canonical_username, "jellyfin")
user = get_user_by_username(canonical_username) user = get_user_by_username(canonical_username)
if matched_email:
set_user_email(canonical_username, matched_email)
user = get_user_by_username(canonical_username) user = get_user_by_username(canonical_username)
_assert_user_can_login(user) _assert_user_can_login(user)
try: try:
@@ -660,6 +757,7 @@ async def jellyseerr_login(request: Request, form_data: OAuth2PasswordRequestFor
_record_login_failure(request, form_data.username) _record_login_failure(request, form_data.username)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Seerr credentials") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Seerr credentials")
jellyseerr_user_id = _extract_jellyseerr_user_id(response) jellyseerr_user_id = _extract_jellyseerr_user_id(response)
jellyseerr_email = _extract_jellyseerr_response_email(response)
ci_matches = get_users_by_username_ci(form_data.username) ci_matches = get_users_by_username_ci(form_data.username)
preferred_match = _pick_preferred_ci_user_match(ci_matches, form_data.username) preferred_match = _pick_preferred_ci_user_match(ci_matches, form_data.username)
canonical_username = str(preferred_match.get("username") or form_data.username) if preferred_match else form_data.username canonical_username = str(preferred_match.get("username") or form_data.username) if preferred_match else form_data.username
@@ -668,13 +766,22 @@ async def jellyseerr_login(request: Request, form_data: OAuth2PasswordRequestFor
canonical_username, canonical_username,
"jellyseerr-user", "jellyseerr-user",
role="user", role="user",
email=jellyseerr_email,
auth_provider="jellyseerr", auth_provider="jellyseerr",
jellyseerr_user_id=jellyseerr_user_id, jellyseerr_user_id=jellyseerr_user_id,
) )
elif (
preferred_match
and str(preferred_match.get("role") or "user").strip().lower() != "admin"
and str(preferred_match.get("auth_provider") or "local").strip().lower() not in {"jellyfin", "jellyseerr"}
):
set_user_auth_provider(canonical_username, "jellyseerr")
user = get_user_by_username(canonical_username) user = get_user_by_username(canonical_username)
_assert_user_can_login(user) _assert_user_can_login(user)
if jellyseerr_user_id is not None: if jellyseerr_user_id is not None:
set_user_jellyseerr_id(canonical_username, jellyseerr_user_id) set_user_jellyseerr_id(canonical_username, jellyseerr_user_id)
if jellyseerr_email:
set_user_email(canonical_username, jellyseerr_email)
token = create_access_token(canonical_username, "user") token = create_access_token(canonical_username, "user")
_clear_login_failures(request, form_data.username) _clear_login_failures(request, form_data.username)
set_last_login(canonical_username) set_last_login(canonical_username)
@@ -735,11 +842,10 @@ async def signup(payload: dict) -> dict:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invite code is required") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invite code is required")
if not username: if not username:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username is required") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username is required")
if len(password.strip()) < 8: try:
raise HTTPException( password_value = validate_password_policy(password)
status_code=status.HTTP_400_BAD_REQUEST, except ValueError as exc:
detail="Password must be at least 8 characters.", raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
)
if get_user_by_username(username): if get_user_by_username(username):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="User already exists") raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="User already exists")
logger.info( logger.info(
@@ -786,7 +892,6 @@ async def signup(payload: dict) -> dict:
expires_at = (datetime.now(timezone.utc) + timedelta(days=account_expires_days)).isoformat() expires_at = (datetime.now(timezone.utc) + timedelta(days=account_expires_days)).isoformat()
runtime = get_runtime_settings() runtime = get_runtime_settings()
password_value = password.strip()
auth_provider = "local" auth_provider = "local"
local_password_value = password_value local_password_value = password_value
matched_jellyseerr_user_id: int | None = None matched_jellyseerr_user_id: int | None = None
@@ -839,6 +944,7 @@ async def signup(payload: dict) -> dict:
username, username,
local_password_value, local_password_value,
role=role, role=role,
email=normalize_delivery_email(invite.get("recipient_email")) if isinstance(invite, dict) else None,
auth_provider=auth_provider, auth_provider=auth_provider,
jellyseerr_user_id=matched_jellyseerr_user_id, jellyseerr_user_id=matched_jellyseerr_user_id,
auto_search_enabled=auto_search_enabled, auto_search_enabled=auto_search_enabled,
@@ -901,6 +1007,8 @@ async def forgot_password(payload: dict, request: Request) -> dict:
identifier = payload.get("identifier") or payload.get("username") or payload.get("email") identifier = payload.get("identifier") or payload.get("username") or payload.get("email")
if not isinstance(identifier, str) or not identifier.strip(): if not isinstance(identifier, str) or not identifier.strip():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username or email is required.") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username or email is required.")
_enforce_password_reset_rate_limit(request, identifier)
_record_password_reset_attempt(request, identifier)
ready, detail = smtp_email_config_ready() ready, detail = smtp_email_config_ready()
if not ready: if not ready:
@@ -960,14 +1068,15 @@ async def password_reset(payload: dict) -> dict:
new_password = payload.get("new_password") new_password = payload.get("new_password")
if not isinstance(token, str) or not token.strip(): if not isinstance(token, str) or not token.strip():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Reset token is required.") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Reset token is required.")
if not isinstance(new_password, str) or len(new_password.strip()) < 8: if not isinstance(new_password, str):
raise HTTPException( raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=PASSWORD_POLICY_MESSAGE)
status_code=status.HTTP_400_BAD_REQUEST, try:
detail="Password must be at least 8 characters.", new_password_clean = validate_password_policy(new_password)
) except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
try: try:
result = await apply_password_reset(token.strip(), new_password.strip()) result = await apply_password_reset(token.strip(), new_password_clean)
except PasswordResetUnavailableError as exc: except PasswordResetUnavailableError as exc:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
except ValueError as exc: except ValueError as exc:
@@ -1065,8 +1174,7 @@ async def create_profile_invite(payload: dict, current_user: dict = Depends(get_
label = str(label).strip() or None label = str(label).strip() or None
if description is not None: if description is not None:
description = str(description).strip() or None description = str(description).strip() or None
if recipient_email is not None: recipient_email = _require_recipient_email(recipient_email)
recipient_email = str(recipient_email).strip() or None
send_email = bool(payload.get("send_email")) send_email = bool(payload.get("send_email"))
delivery_message = str(payload.get("message") or "").strip() or None delivery_message = str(payload.get("message") or "").strip() or None
@@ -1156,8 +1264,7 @@ async def update_profile_invite(
label = str(label).strip() or None label = str(label).strip() or None
if description is not None: if description is not None:
description = str(description).strip() or None description = str(description).strip() or None
if recipient_email is not None: recipient_email = _require_recipient_email(recipient_email)
recipient_email = str(recipient_email).strip() or None
send_email = bool(payload.get("send_email")) send_email = bool(payload.get("send_email"))
delivery_message = str(payload.get("message") or "").strip() or None delivery_message = str(payload.get("message") or "").strip() or None
@@ -1232,14 +1339,13 @@ async def change_password(payload: dict, current_user: dict = Depends(get_curren
new_password = payload.get("new_password") if isinstance(payload, dict) else None new_password = payload.get("new_password") if isinstance(payload, dict) else None
if not isinstance(current_password, str) or not isinstance(new_password, str): if not isinstance(current_password, str) or not isinstance(new_password, str):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid payload") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid payload")
if len(new_password.strip()) < 8: try:
raise HTTPException( new_password_clean = validate_password_policy(new_password)
status_code=status.HTTP_400_BAD_REQUEST, detail="Password must be at least 8 characters." except ValueError as exc:
) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
username = str(current_user.get("username") or "").strip() username = str(current_user.get("username") or "").strip()
if not username: if not username:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid user") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid user")
new_password_clean = new_password.strip()
stored_user = normalize_user_auth_provider(get_user_by_username(username)) stored_user = normalize_user_auth_provider(get_user_by_username(username))
auth_provider = resolve_user_auth_provider(stored_user or current_user) auth_provider = resolve_user_auth_provider(stored_user or current_user)
logger.info("password change requested username=%s provider=%s", username, auth_provider) logger.info("password change requested username=%s provider=%s", username, auth_provider)

View File

@@ -4,6 +4,12 @@ from .db import get_settings_overrides
_INT_FIELDS = { _INT_FIELDS = {
"magent_application_port", "magent_application_port",
"magent_api_port", "magent_api_port",
"auth_rate_limit_window_seconds",
"auth_rate_limit_max_attempts_ip",
"auth_rate_limit_max_attempts_user",
"password_reset_rate_limit_window_seconds",
"password_reset_rate_limit_max_attempts_ip",
"password_reset_rate_limit_max_attempts_identifier",
"sonarr_quality_profile_id", "sonarr_quality_profile_id",
"radarr_quality_profile_id", "radarr_quality_profile_id",
"jwt_exp_minutes", "jwt_exp_minutes",

View File

@@ -1,13 +1,16 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from jose import JWTError, jwt
from passlib.context import CryptContext from passlib.context import CryptContext
import jwt
from jwt import InvalidTokenError
from .config import settings from .config import settings
_pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
_ALGORITHM = "HS256" _ALGORITHM = "HS256"
MIN_PASSWORD_LENGTH = 8
PASSWORD_POLICY_MESSAGE = f"Password must be at least {MIN_PASSWORD_LENGTH} characters."
def hash_password(password: str) -> str: def hash_password(password: str) -> str:
@@ -18,6 +21,13 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
return _pwd_context.verify(plain_password, hashed_password) return _pwd_context.verify(plain_password, hashed_password)
def validate_password_policy(password: str) -> str:
candidate = password.strip()
if len(candidate) < MIN_PASSWORD_LENGTH:
raise ValueError(PASSWORD_POLICY_MESSAGE)
return candidate
def _create_token( def _create_token(
subject: str, subject: str,
role: str, role: str,
@@ -55,5 +65,5 @@ class TokenError(Exception):
def safe_decode_token(token: str) -> Dict[str, Any]: def safe_decode_token(token: str) -> Dict[str, Any]:
try: try:
return decode_token(token) return decode_token(token)
except JWTError as exc: except InvalidTokenError as exc:
raise TokenError("Invalid token") from exc raise TokenError("Invalid token") from exc

View File

@@ -6,9 +6,12 @@ import json
import logging import logging
import re import re
import smtplib import smtplib
from functools import lru_cache
from pathlib import Path
from email.generator import BytesGenerator from email.generator import BytesGenerator
from email.message import EmailMessage from email.message import EmailMessage
from email.utils import formataddr from email.policy import SMTP as SMTP_POLICY
from email.utils import formataddr, formatdate, make_msgid
from io import BytesIO from io import BytesIO
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
@@ -25,6 +28,7 @@ EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PLACEHOLDER_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_]+)\s*}}") PLACEHOLDER_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_]+)\s*}}")
EXCHANGE_MESSAGE_ID_PATTERN = re.compile(r"<([^>]+)>") EXCHANGE_MESSAGE_ID_PATTERN = re.compile(r"<([^>]+)>")
EXCHANGE_INTERNAL_ID_PATTERN = re.compile(r"\[InternalId=([^\],]+)") EXCHANGE_INTERNAL_ID_PATTERN = re.compile(r"\[InternalId=([^\],]+)")
EMAIL_LOGO_CID = "magent-logo"
TEMPLATE_METADATA: Dict[str, Dict[str, Any]] = { TEMPLATE_METADATA: Dict[str, Dict[str, Any]] = {
"invited": { "invited": {
@@ -136,6 +140,111 @@ TEMPLATE_PRESENTATION: Dict[str, Dict[str, str]] = {
}, },
} }
def _build_email_stat_card(label: str, value: str, detail: str = "") -> str:
detail_html = (
f"<div style=\"margin-top:8px; font-size:13px; line-height:1.6; color:#5c687d; word-break:break-word;\">"
f"{html.escape(detail)}</div>"
if detail
else ""
)
return (
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"border-collapse:separate; background:#f8fafc; border:1px solid #d9e2ef; border-radius:16px;\">"
"<tr><td style=\"padding:16px; font-family:Segoe UI, Arial, sans-serif; color:#132033;\">"
f"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#6b778c; margin-bottom:8px;\">"
f"{html.escape(label)}</div>"
f"<div style=\"font-size:20px; font-weight:800; line-height:1.45; word-break:break-word; color:#132033;\">"
f"{html.escape(value)}</div>"
f"{detail_html}"
"</td></tr></table>"
)
def _build_email_stat_grid(cards: list[str]) -> str:
if not cards:
return ""
rows: list[str] = []
for index in range(0, len(cards), 2):
left = cards[index]
right = cards[index + 1] if index + 1 < len(cards) else ""
rows.append(
"<tr>"
f"<td width=\"50%\" style=\"vertical-align:top; padding:0 5px 10px 0;\">{left}</td>"
f"<td width=\"50%\" style=\"vertical-align:top; padding:0 0 10px 5px;\">{right}</td>"
"</tr>"
)
return (
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"border-collapse:collapse; margin:0 0 18px;\">"
f"{''.join(rows)}"
"</table>"
)
def _build_email_list(items: list[str], *, ordered: bool = False) -> str:
tag = "ol" if ordered else "ul"
marker = "padding-left:20px;" if ordered else "padding-left:18px;"
rendered_items = "".join(
f"<li style=\"margin:0 0 8px;\">{html.escape(item)}</li>" for item in items if item
)
return (
f"<{tag} style=\"margin:0; {marker} color:#132033; line-height:1.8; font-size:14px;\">"
f"{rendered_items}"
f"</{tag}>"
)
def _build_email_panel(title: str, body_html: str, *, variant: str = "neutral") -> str:
styles = {
"neutral": {
"background": "#f8fafc",
"border": "#d9e2ef",
"eyebrow": "#6b778c",
"text": "#132033",
},
"brand": {
"background": "#eef4ff",
"border": "#bfd2ff",
"eyebrow": "#2754b6",
"text": "#132033",
},
"success": {
"background": "#edf9f0",
"border": "#bfe4c6",
"eyebrow": "#1f7a3f",
"text": "#132033",
},
"warning": {
"background": "#fff5ea",
"border": "#ffd5a8",
"eyebrow": "#c46a10",
"text": "#132033",
},
"danger": {
"background": "#fff0f0",
"border": "#f3c1c1",
"eyebrow": "#bb2d2d",
"text": "#132033",
},
}.get(variant, {
"background": "#f8fafc",
"border": "#d9e2ef",
"eyebrow": "#6b778c",
"text": "#132033",
})
return (
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
f"style=\"border-collapse:separate; margin:0 0 18px; background:{styles['background']}; "
f"border:1px solid {styles['border']}; border-radius:18px;\">"
f"<tr><td style=\"padding:18px; font-family:Segoe UI, Arial, sans-serif; color:{styles['text']};\">"
f"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:{styles['eyebrow']}; margin-bottom:10px;\">"
f"{html.escape(title)}</div>"
f"<div style=\"font-size:14px; line-height:1.8; color:{styles['text']};\">{body_html}</div>"
"</td></tr></table>"
)
DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = { DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
"invited": { "invited": {
"subject": "{{app_name}} invite for {{recipient_email}}", "subject": "{{app_name}} invite for {{recipient_email}}",
@@ -153,34 +262,43 @@ DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
"Build: {{build_number}}\n" "Build: {{build_number}}\n"
), ),
"body_html": ( "body_html": (
"<div style=\"margin:0 0 20px; color:#e9ecf5; font-size:15px; line-height:1.7;\">" "<div style=\"margin:0 0 20px; color:#132033; font-size:15px; line-height:1.7;\">"
"A new invitation has been prepared for <strong>{{recipient_email}}</strong>. Use the details below to sign up." "A new invitation has been prepared for <strong>{{recipient_email}}</strong>. Use the details below to sign up."
"</div>" "</div>"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:separate; border-spacing:10px 10px; margin:0 0 18px;\">" + _build_email_stat_grid(
"<tr>" [
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" _build_email_stat_card("Invite code", "{{invite_code}}"),
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Invite code</div>" _build_email_stat_card("Invited by", "{{inviter_username}}"),
"<div style=\"font-size:24px; font-weight:800; letter-spacing:0.06em;\">{{invite_code}}</div>" _build_email_stat_card("Invite label", "{{invite_label}}"),
"</td>" _build_email_stat_card(
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" "Access window",
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Invited by</div>" "{{invite_expires_at}}",
"<div style=\"font-size:20px; font-weight:700;\">{{inviter_username}}</div>" "Remaining uses: {{invite_remaining_uses}}",
"</td>" ),
"</tr>" ]
"<tr>" )
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" + _build_email_panel(
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Invite label</div>" "Invitation details",
"<div style=\"font-size:18px; font-weight:700;\">{{invite_label}}</div>" "<div style=\"white-space:pre-line;\">{{invite_description}}</div>",
"</td>" variant="brand",
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" )
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Access window</div>" + _build_email_panel(
"<div style=\"font-size:16px; font-weight:700;\">{{invite_expires_at}}</div>" "Message from admin",
"<div style=\"margin-top:6px; font-size:13px; color:#9aa3b8;\">Remaining uses: {{invite_remaining_uses}}</div>" "<div style=\"white-space:pre-line;\">{{message}}</div>",
"</td>" variant="neutral",
"</tr>" )
"</table>" + _build_email_panel(
"<div style=\"margin:0 0 18px; padding:18px; background:#101726; border:1px solid rgba(59,130,246,0.22); border-radius:18px; color:#dbe5ff; font-size:14px; line-height:1.7; white-space:pre-line;\">{{invite_description}}</div>" "What happens next",
"<div style=\"margin:0; padding:18px; background:#101726; border:1px dashed rgba(255,107,43,0.38); border-radius:18px; color:#dbe5ff; font-size:14px; line-height:1.7; white-space:pre-line;\">{{message}}</div>" _build_email_list(
[
"Open the invite link and complete the signup flow.",
"Sign in using the shared credentials for Magent and Seerr.",
"Use the How it works page if you want a quick overview first.",
],
ordered=True,
),
variant="neutral",
)
), ),
}, },
"welcome": { "welcome": {
@@ -194,30 +312,34 @@ DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
"{{message}}\n" "{{message}}\n"
), ),
"body_html": ( "body_html": (
"<div style=\"margin:0 0 18px; color:#e9ecf5; font-size:15px; line-height:1.7;\">" "<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
"Your account is live and ready to use. Everything below mirrors the current site behavior." "Your account is live and ready to use. Everything below mirrors the current site behavior."
"</div>" "</div>"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:separate; border-spacing:10px 10px; margin:0 0 18px;\">" + _build_email_stat_grid(
"<tr>" [
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" _build_email_stat_card("Username", "{{username}}"),
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Username</div>" _build_email_stat_card("Role", "{{role}}"),
"<div style=\"font-size:22px; font-weight:800;\">{{username}}</div>" _build_email_stat_card("Magent", "{{app_url}}"),
"</td>" _build_email_stat_card("Guides", "{{how_it_works_url}}"),
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" ]
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Role</div>" )
"<div style=\"font-size:22px; font-weight:800;\">{{role}}</div>" + _build_email_panel(
"</td>" "What to do next",
"</tr>" _build_email_list(
"</table>" [
"<div style=\"margin:0 0 18px; padding:18px; background:#101726; border:1px solid rgba(34,197,94,0.24); border-radius:18px; color:#dbe5ff;\">" "Open Magent and sign in using your shared credentials.",
"<div style=\"font-size:15px; font-weight:700; margin:0 0 10px;\">What to do next</div>" "Search all requests or review your own activity without refreshing the page.",
"<ol style=\"margin:0; padding-left:20px; color:#dbe5ff; line-height:1.8; font-size:14px;\">" "Use the invite tools in your profile if your account allows it.",
"<li>Open Magent and sign in using your shared credentials.</li>" ],
"<li>Search or review requests without refreshing every page.</li>" ordered=True,
"<li>Use the invite tools in your profile if your account allows it.</li>" ),
"</ol>" variant="success",
"</div>" )
"<div style=\"margin:0; padding:18px; background:#101726; border:1px dashed rgba(59,130,246,0.32); border-radius:18px; color:#dbe5ff; font-size:14px; line-height:1.7; white-space:pre-line;\">{{message}}</div>" + _build_email_panel(
"Additional notes",
"<div style=\"white-space:pre-line;\">{{message}}</div>",
variant="neutral",
)
), ),
}, },
"warning": { "warning": {
@@ -230,15 +352,38 @@ DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
"If you need help, contact the admin.\n" "If you need help, contact the admin.\n"
), ),
"body_html": ( "body_html": (
"<div style=\"margin:0 0 18px; color:#e9ecf5; font-size:15px; line-height:1.7;\">" "<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
"Please review this account notice carefully. This message was sent by an administrator." "Please review this account notice carefully. This message was sent by an administrator."
"</div>" "</div>"
"<div style=\"margin:0 0 18px; padding:18px; background:#241814; border:1px solid rgba(251,146,60,0.34); border-radius:18px; color:#ffe0ba;\">" + _build_email_stat_grid(
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#fbbd7b; margin-bottom:8px;\">Reason</div>" [
"<div style=\"font-size:18px; font-weight:800; line-height:1.5; white-space:pre-line;\">{{reason}}</div>" _build_email_stat_card("Account", "{{username}}"),
"</div>" _build_email_stat_card("Role", "{{role}}"),
"<div style=\"margin:0 0 18px; padding:18px; background:#101726; border:1px solid rgba(255,255,255,0.08); border-radius:18px; color:#dbe5ff; font-size:14px; line-height:1.8; white-space:pre-line;\">{{message}}</div>" _build_email_stat_card("Application", "{{app_name}}"),
"<div style=\"margin:0; color:#9aa3b8; font-size:13px; line-height:1.7;\">If you need help or think this was sent in error, contact the site administrator.</div>" _build_email_stat_card("Support", "{{how_it_works_url}}"),
]
)
+ _build_email_panel(
"Reason",
"<div style=\"font-size:18px; font-weight:800; line-height:1.6; white-space:pre-line;\">{{reason}}</div>",
variant="warning",
)
+ _build_email_panel(
"Administrator note",
"<div style=\"white-space:pre-line;\">{{message}}</div>",
variant="neutral",
)
+ _build_email_panel(
"What to do next",
_build_email_list(
[
"Review the note above and confirm you understand what needs to change.",
"If you need help, reply through your usual support path or contact an administrator.",
"Keep this email for reference until the matter is resolved.",
]
),
variant="neutral",
)
), ),
}, },
"banned": { "banned": {
@@ -250,18 +395,38 @@ DEFAULT_TEMPLATES: Dict[str, Dict[str, str]] = {
"{{message}}\n" "{{message}}\n"
), ),
"body_html": ( "body_html": (
"<div style=\"margin:0 0 18px; color:#e9ecf5; font-size:15px; line-height:1.7;\">" "<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
"Your account access has changed. Review the details below." "Your account access has changed. Review the details below."
"</div>" "</div>"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"margin:0 0 18px; border-collapse:collapse;\">" + _build_email_stat_grid(
"<tr>" [
"<td style=\"padding:18px; background:#251418; border:1px solid rgba(239,68,68,0.32); border-radius:18px; color:#ffd0d0;\">" _build_email_stat_card("Account", "{{username}}"),
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#ff9b9b; margin-bottom:8px;\">Reason</div>" _build_email_stat_card("Status", "Restricted"),
"<div style=\"font-size:18px; font-weight:800; line-height:1.5; white-space:pre-line;\">{{reason}}</div>" _build_email_stat_card("Application", "{{app_name}}"),
"</td>" _build_email_stat_card("Guidance", "{{how_it_works_url}}"),
"</tr>" ]
"</table>" )
"<div style=\"margin:0; padding:18px; background:#101726; border:1px solid rgba(255,255,255,0.08); border-radius:18px; color:#dbe5ff; font-size:14px; line-height:1.8; white-space:pre-line;\">{{message}}</div>" + _build_email_panel(
"Reason",
"<div style=\"font-size:18px; font-weight:800; line-height:1.6; white-space:pre-line;\">{{reason}}</div>",
variant="danger",
)
+ _build_email_panel(
"Administrator note",
"<div style=\"white-space:pre-line;\">{{message}}</div>",
variant="neutral",
)
+ _build_email_panel(
"What this means",
_build_email_list(
[
"Your access has been removed or restricted across the linked services.",
"If you believe this is incorrect, contact the site administrator directly.",
"Do not rely on old links or cached sessions after this change.",
]
),
variant="neutral",
)
), ),
}, },
} }
@@ -286,6 +451,10 @@ def _normalize_email(value: object) -> Optional[str]:
return str(value).strip() return str(value).strip()
def normalize_delivery_email(value: object) -> Optional[str]:
return _normalize_email(value)
def _normalize_display_text(value: object, fallback: str = "") -> str: def _normalize_display_text(value: object, fallback: str = "") -> str:
if value is None: if value is None:
return fallback return fallback
@@ -349,17 +518,141 @@ def _looks_like_full_html_document(value: str) -> bool:
def _build_email_action_button(label: str, url: str, *, primary: bool) -> str: def _build_email_action_button(label: str, url: str, *, primary: bool) -> str:
background = "linear-gradient(135deg, #ff6b2b 0%, #1c6bff 100%)" if primary else "#151c2d" background = "linear-gradient(135deg, #ff6b2b 0%, #1c6bff 100%)" if primary else "#ffffff"
border = "1px solid rgba(59, 130, 246, 0.32)" if primary else "1px solid rgba(255, 255, 255, 0.12)" fallback = "#1c6bff" if primary else "#ffffff"
color = "#ffffff" border = "1px solid rgba(28, 107, 255, 0.28)" if primary else "1px solid #d5deed"
color = "#ffffff" if primary else "#132033"
return ( return (
f"<a href=\"{html.escape(url)}\" " f"<a href=\"{html.escape(url)}\" "
f"style=\"display:inline-block; padding:12px 20px; margin:0 12px 12px 0; border-radius:999px; " f"style=\"display:inline-block; padding:12px 20px; margin:0 12px 12px 0; border-radius:999px; "
f"background:{background}; border:{border}; color:{color}; text-decoration:none; font-size:14px; " f"background-color:{fallback}; background:{background}; border:{border}; color:{color}; text-decoration:none; font-size:14px; "
f"font-weight:800; letter-spacing:0.01em;\">{html.escape(label)}</a>" f"font-weight:800; letter-spacing:0.01em;\">{html.escape(label)}</a>"
) )
@lru_cache(maxsize=1)
def _get_email_logo_bytes() -> bytes:
logo_path = Path(__file__).resolve().parents[1] / "assets" / "branding" / "logo.png"
try:
return logo_path.read_bytes()
except OSError:
return b""
def _build_email_logo_block(app_name: str) -> str:
if _get_email_logo_bytes():
return (
f"<img src=\"cid:{EMAIL_LOGO_CID}\" alt=\"{html.escape(app_name)}\" width=\"52\" height=\"52\" "
"style=\"display:block; width:52px; height:52px; border-radius:14px; border:1px solid rgba(255,255,255,0.08); "
"background:#0f1522; padding:6px;\" />"
)
return (
"<div style=\"width:52px; height:52px; border-radius:14px; border:1px solid rgba(255,255,255,0.08); "
"background:linear-gradient(135deg, #ff6b2b 0%, #1c6bff 100%); color:#ffffff; font-size:24px; "
"font-weight:900; text-align:center; line-height:52px;\">M</div>"
)
def _build_outlook_safe_test_email_html(
*,
app_name: str,
application_url: str,
build_number: str,
smtp_target: str,
security_mode: str,
auth_mode: str,
warning: str,
primary_url: str = "",
) -> str:
action_html = (
_build_email_action_button("Open Magent", primary_url, primary=True) if primary_url else ""
)
logo_block = _build_email_logo_block(app_name)
warning_block = (
"<tr>"
"<td style=\"padding:0 32px 24px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"border-collapse:separate; background:#fff5ea; border:1px solid #ffd5a8; border-radius:14px;\">"
"<tr><td style=\"padding:16px; color:#132033; font-size:14px; line-height:1.7;\">"
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#c46a10; margin-bottom:8px;\">"
"Delivery notes</div>"
f"{html.escape(warning)}"
"</td></tr></table>"
"</td>"
"</tr>"
) if warning else ""
return (
"<!doctype html>"
"<html>"
"<body style=\"margin:0; padding:0; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"border-collapse:collapse; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
"<tr><td align=\"center\" style=\"padding:32px 16px;\">"
"<table role=\"presentation\" width=\"680\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"width:680px; max-width:680px; border-collapse:collapse; background:#ffffff; border:1px solid #d5deed;\">"
"<tr><td style=\"padding:24px 32px; background:#0f172a;\" bgcolor=\"#0f172a\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
"<tr>"
f"<td width=\"56\" valign=\"middle\">{logo_block}</td>"
"<td valign=\"middle\" style=\"padding-left:16px; font-family:Segoe UI, Arial, sans-serif; color:#ffffff;\">"
f"<div style=\"font-size:28px; line-height:1.1; font-weight:800; color:#ffffff;\">{html.escape(app_name)} email test</div>"
"<div style=\"margin-top:6px; font-size:15px; line-height:1.5; color:#d5deed;\">This confirms Magent can generate and hand off branded mail.</div>"
"</td>"
"</tr>"
"</table>"
"</td></tr>"
"<tr><td height=\"6\" style=\"background:#ff6b2b; font-size:0; line-height:0;\">&nbsp;</td></tr>"
"<tr><td style=\"padding:28px 32px 18px 32px; font-family:Segoe UI, Arial, sans-serif; color:#132033;\">"
"<div style=\"font-size:18px; line-height:1.6; color:#132033;\">This is a test email from <strong>Magent</strong>.</div>"
"</td></tr>"
"<tr>"
"<td style=\"padding:0 32px 18px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
"<tr>"
"<td width=\"50%\" valign=\"top\" style=\"padding:0 8px 12px 0;\">"
f"{_build_email_stat_card('Build', build_number)}"
"</td>"
"<td width=\"50%\" valign=\"top\" style=\"padding:0 0 12px 8px;\">"
f"{_build_email_stat_card('Application URL', application_url)}"
"</td>"
"</tr>"
"<tr>"
"<td width=\"50%\" valign=\"top\" style=\"padding:0 8px 12px 0;\">"
f"{_build_email_stat_card('SMTP target', smtp_target)}"
"</td>"
"<td width=\"50%\" valign=\"top\" style=\"padding:0 0 12px 8px;\">"
f"{_build_email_stat_card('Security', security_mode, auth_mode)}"
"</td>"
"</tr>"
"</table>"
"</td>"
"</tr>"
"<tr>"
"<td style=\"padding:0 32px 24px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"border-collapse:separate; background:#eef4ff; border:1px solid #bfd2ff; border-radius:14px;\">"
"<tr><td style=\"padding:16px; color:#132033; font-size:14px; line-height:1.8;\">"
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#2754b6; margin-bottom:8px;\">"
"What this verifies</div>"
"<div>Magent can build the HTML template shell correctly.</div>"
"<div>The configured SMTP route accepts and relays the message.</div>"
"<div>Branding, links, and build metadata are rendering consistently.</div>"
"</td></tr></table>"
"</td>"
"</tr>"
f"{warning_block}"
"<tr>"
"<td style=\"padding:0 32px 32px 32px; font-family:Segoe UI, Arial, sans-serif;\">"
f"{action_html}"
"</td>"
"</tr>"
"</table>"
"</td></tr></table>"
"</body>"
"</html>"
)
def _wrap_email_html( def _wrap_email_html(
*, *,
app_name: str, app_name: str,
@@ -376,10 +669,6 @@ def _wrap_email_html(
footer_note: str = "", footer_note: str = "",
) -> str: ) -> str:
styles = EMAIL_TONE_STYLES.get(tone, EMAIL_TONE_STYLES["brand"]) styles = EMAIL_TONE_STYLES.get(tone, EMAIL_TONE_STYLES["brand"])
logo_url = ""
if app_url.lower().startswith("http://") or app_url.lower().startswith("https://"):
logo_url = f"{app_url.rstrip('/')}/branding/logo.png"
actions = [] actions = []
if primary_label and primary_url: if primary_label and primary_url:
actions.append(_build_email_action_button(primary_label, primary_url, primary=True)) actions.append(_build_email_action_button(primary_label, primary_url, primary=True))
@@ -388,53 +677,43 @@ def _wrap_email_html(
actions_html = "".join(actions) actions_html = "".join(actions)
footer = footer_note or "This email was generated automatically by Magent." footer = footer_note or "This email was generated automatically by Magent."
logo_block = ( logo_block = _build_email_logo_block(app_name)
f"<img src=\"{html.escape(logo_url)}\" alt=\"{html.escape(app_name)}\" width=\"52\" height=\"52\" "
"style=\"display:block; width:52px; height:52px; border-radius:14px; border:1px solid rgba(255,255,255,0.08); "
"background:#0f1522; padding:6px;\" />"
if logo_url
else (
"<div style=\"width:52px; height:52px; border-radius:14px; border:1px solid rgba(255,255,255,0.08); "
"background:linear-gradient(135deg, #ff6b2b 0%, #1c6bff 100%); color:#ffffff; font-size:24px; "
"font-weight:900; text-align:center; line-height:52px;\">M</div>"
)
)
return ( return (
"<!doctype html>" "<!doctype html>"
"<html><body style=\"margin:0; padding:0; background:#05070d;\">" "<html><body style=\"margin:0; padding:0; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
"<div style=\"display:none; max-height:0; overflow:hidden; opacity:0;\">" "<div style=\"display:none; max-height:0; overflow:hidden; opacity:0;\">"
f"{html.escape(title)} - {html.escape(subtitle)}" f"{html.escape(title)} - {html.escape(subtitle)}"
"</div>" "</div>"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" " "<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"width:100%; border-collapse:collapse; background:radial-gradient(circle at top, rgba(17,33,74,0.9) 0%, rgba(8,12,22,1) 55%, #05070d 100%);\">" "style=\"width:100%; border-collapse:collapse; background:#eef2f7;\" bgcolor=\"#eef2f7\">"
"<tr><td style=\"padding:32px 18px;\">" "<tr><td style=\"padding:32px 18px;\" bgcolor=\"#eef2f7\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" " "<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" "
"style=\"max-width:680px; margin:0 auto; border-collapse:collapse;\">" "style=\"max-width:680px; margin:0 auto; border-collapse:collapse;\">"
"<tr><td style=\"padding:0 0 18px;\">" "<tr><td style=\"padding:0 0 18px;\">"
f"<div style=\"padding:24px 28px; background:#0b0f18; border:1px solid rgba(255,255,255,0.08); border-radius:28px; box-shadow:0 24px 60px rgba(0,0,0,0.42);\">" f"<div style=\"padding:24px 28px; background:#ffffff; border:1px solid #d5deed; border-radius:28px; box-shadow:0 18px 48px rgba(15,23,42,0.08);\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">" "<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
"<tr>" "<tr>"
f"<td style=\"vertical-align:middle; width:64px; padding:0 18px 0 0;\">{logo_block}</td>" f"<td style=\"vertical-align:middle; width:64px; padding:0 18px 0 0;\">{logo_block}</td>"
"<td style=\"vertical-align:middle;\">" "<td style=\"vertical-align:middle;\">"
f"<div style=\"font-size:11px; letter-spacing:0.18em; text-transform:uppercase; color:#9aa3b8; margin-bottom:6px;\">{html.escape(app_name)}</div>" f"<div style=\"font-size:11px; letter-spacing:0.18em; text-transform:uppercase; color:#6b778c; margin-bottom:6px;\">{html.escape(app_name)}</div>"
f"<div style=\"font-size:30px; line-height:1.1; font-weight:900; color:#e9ecf5; margin:0 0 6px;\">{html.escape(title)}</div>" f"<div style=\"font-size:30px; line-height:1.1; font-weight:900; color:#132033; margin:0 0 6px;\">{html.escape(title)}</div>"
f"<div style=\"font-size:15px; line-height:1.6; color:#9aa3b8;\">{html.escape(subtitle or EMAIL_TAGLINE)}</div>" f"<div style=\"font-size:15px; line-height:1.6; color:#5c687d;\">{html.escape(subtitle or EMAIL_TAGLINE)}</div>"
"</td>" "</td>"
"</tr>" "</tr>"
"</table>" "</table>"
f"<div style=\"height:6px; margin:22px 0 22px; border-radius:999px; background:linear-gradient(90deg, {styles['accent_a']} 0%, {styles['accent_b']} 100%);\"></div>" f"<div style=\"height:6px; margin:22px 0 22px; border-radius:999px; background-color:{styles['accent_b']}; background:linear-gradient(90deg, {styles['accent_a']} 0%, {styles['accent_b']} 100%);\"></div>"
f"<div style=\"display:inline-block; padding:7px 12px; margin:0 0 16px; background:{styles['chip_bg']}; " f"<div style=\"display:inline-block; padding:7px 12px; margin:0 0 16px; background:{styles['chip_bg']}; "
f"border:1px solid {styles['chip_border']}; border-radius:999px; color:{styles['chip_text']}; " f"border:1px solid {styles['chip_border']}; border-radius:999px; color:{styles['chip_text']}; "
"font-size:11px; font-weight:800; letter-spacing:0.14em; text-transform:uppercase;\">" "font-size:11px; font-weight:800; letter-spacing:0.14em; text-transform:uppercase;\">"
f"{html.escape(EMAIL_TAGLINE)}</div>" f"{html.escape(EMAIL_TAGLINE)}</div>"
f"<div style=\"color:#e9ecf5;\">{body_html}</div>" f"<div style=\"color:#132033;\">{body_html}</div>"
f"<div style=\"margin:24px 0 0;\">{actions_html}</div>" f"<div style=\"margin:24px 0 0;\">{actions_html}</div>"
"<div style=\"margin:28px 0 0; padding:18px 0 0; border-top:1px solid rgba(255,255,255,0.08);\">" "<div style=\"margin:28px 0 0; padding:18px 0 0; border-top:1px solid #e2e8f0;\">"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">" "<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:collapse;\">"
"<tr>" "<tr>"
f"<td style=\"font-size:12px; line-height:1.7; color:#9aa3b8;\">{html.escape(footer)}</td>" f"<td style=\"font-size:12px; line-height:1.7; color:#6b778c;\">{html.escape(footer)}</td>"
f"<td style=\"font-size:12px; line-height:1.7; color:#9aa3b8; text-align:right;\">Build {html.escape(build_number)}</td>" f"<td style=\"font-size:12px; line-height:1.7; color:#6b778c; text-align:right;\">Build {html.escape(build_number)}</td>"
"</tr>" "</tr>"
"</table>" "</table>"
"</div>" "</div>"
@@ -481,7 +760,7 @@ def build_invite_email_context(
invite.get("created_by") if invite else (user.get("username") if user else None), invite.get("created_by") if invite else (user.get("username") if user else None),
"Admin", "Admin",
), ),
"message": _normalize_display_text(message, ""), "message": _normalize_display_text(message, "No additional note."),
"reason": _normalize_display_text(reason, "Not specified"), "reason": _normalize_display_text(reason, "Not specified"),
"recipient_email": _normalize_display_text(resolved_recipient, "No email supplied"), "recipient_email": _normalize_display_text(resolved_recipient, "No email supplied"),
"role": _normalize_display_text(user.get("role") if user else None, "user"), "role": _normalize_display_text(user.get("role") if user else None, "user"),
@@ -605,6 +884,9 @@ def render_invite_email_template(
def resolve_user_delivery_email(user: Optional[Dict[str, Any]], invite: Optional[Dict[str, Any]] = None) -> Optional[str]: def resolve_user_delivery_email(user: Optional[Dict[str, Any]], invite: Optional[Dict[str, Any]] = None) -> Optional[str]:
if not isinstance(user, dict): if not isinstance(user, dict):
return _normalize_email(invite.get("recipient_email") if isinstance(invite, dict) else None) return _normalize_email(invite.get("recipient_email") if isinstance(invite, dict) else None)
stored_email = _normalize_email(user.get("email"))
if stored_email:
return stored_email
username_email = _normalize_email(user.get("username")) username_email = _normalize_email(user.get("username"))
if username_email: if username_email:
return username_email return username_email
@@ -644,7 +926,7 @@ def smtp_email_delivery_warning() -> Optional[str]:
def _flatten_message(message: EmailMessage) -> bytes: def _flatten_message(message: EmailMessage) -> bytes:
buffer = BytesIO() buffer = BytesIO()
BytesGenerator(buffer).flatten(message) BytesGenerator(buffer, policy=SMTP_POLICY).flatten(message)
return buffer.getvalue() return buffer.getvalue()
@@ -722,9 +1004,27 @@ def _send_email_sync(*, recipient_email: str, subject: str, body_text: str, body
message["Subject"] = subject message["Subject"] = subject
message["From"] = formataddr((from_name, from_address)) message["From"] = formataddr((from_name, from_address))
message["To"] = recipient_email message["To"] = recipient_email
message["Date"] = formatdate(localtime=True)
if "@" in from_address:
message["Message-ID"] = make_msgid(domain=from_address.split("@", 1)[1])
else:
message["Message-ID"] = make_msgid()
message.set_content(body_text or _strip_html_for_text(body_html)) message.set_content(body_text or _strip_html_for_text(body_html))
if body_html.strip(): if body_html.strip():
message.add_alternative(body_html, subtype="html") message.add_alternative(body_html, subtype="html")
if f"cid:{EMAIL_LOGO_CID}" in body_html:
logo_bytes = _get_email_logo_bytes()
if logo_bytes:
html_part = message.get_body(preferencelist=("html",))
if html_part is not None:
html_part.add_related(
logo_bytes,
maintype="image",
subtype="png",
cid=f"<{EMAIL_LOGO_CID}>",
filename="logo.png",
disposition="inline",
)
if use_ssl: if use_ssl:
with smtplib.SMTP_SSL(host, port, timeout=20) as smtp: with smtplib.SMTP_SSL(host, port, timeout=20) as smtp:
@@ -835,11 +1135,24 @@ async def send_test_email(recipient_email: Optional[str] = None) -> Dict[str, st
application_url = _normalize_display_text(runtime.magent_application_url, "Not configured") application_url = _normalize_display_text(runtime.magent_application_url, "Not configured")
primary_url = application_url if application_url.lower().startswith(("http://", "https://")) else "" primary_url = application_url if application_url.lower().startswith(("http://", "https://")) else ""
smtp_target = f"{_normalize_display_text(runtime.magent_notify_email_smtp_host, 'Not configured')}:{int(runtime.magent_notify_email_smtp_port or 587)}"
security_mode = "SSL" if runtime.magent_notify_email_use_ssl else ("STARTTLS" if runtime.magent_notify_email_use_tls else "Plain SMTP")
auth_mode = "Authenticated" if (
_normalize_display_text(runtime.magent_notify_email_smtp_username)
and _normalize_display_text(runtime.magent_notify_email_smtp_password)
) else "No SMTP auth"
delivery_warning = smtp_email_delivery_warning()
subject = f"{env_settings.app_name} email test" subject = f"{env_settings.app_name} email test"
body_text = ( body_text = (
f"This is a test email from {env_settings.app_name}.\n\n" f"This is a test email from {env_settings.app_name}.\n\n"
f"Build: {BUILD_NUMBER}\n" f"Build: {BUILD_NUMBER}\n"
f"Application URL: {application_url}\n" f"Application URL: {application_url}\n"
f"SMTP target: {smtp_target}\n"
f"Security: {security_mode} ({auth_mode})\n\n"
"What this verifies:\n"
"- Magent can build the HTML template shell correctly.\n"
"- The configured SMTP route accepts and relays the message.\n"
"- Branding, links, and build metadata are rendering consistently.\n"
) )
body_html = _wrap_email_html( body_html = _wrap_email_html(
app_name=env_settings.app_name, app_name=env_settings.app_name,
@@ -849,24 +1162,39 @@ async def send_test_email(recipient_email: Optional[str] = None) -> Dict[str, st
subtitle="This confirms Magent can generate and hand off branded mail.", subtitle="This confirms Magent can generate and hand off branded mail.",
tone="brand", tone="brand",
body_html=( body_html=(
"<div style=\"margin:0 0 18px; color:#e9ecf5; font-size:15px; line-height:1.7;\">" "<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
"This is a live test email from Magent. If this renders correctly, the HTML template shell and SMTP handoff are both working." "This is a live test email from Magent. If this renders correctly, the HTML template shell and SMTP handoff are both working."
"</div>" "</div>"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:separate; border-spacing:10px 10px; margin:0 0 18px;\">" + _build_email_stat_grid(
"<tr>" [
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" _build_email_stat_card("Recipient", resolved_email),
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Build</div>" _build_email_stat_card("Build", BUILD_NUMBER),
f"<div style=\"font-size:22px; font-weight:800;\">{html.escape(BUILD_NUMBER)}</div>" _build_email_stat_card("SMTP target", smtp_target),
"</td>" _build_email_stat_card("Security", security_mode, auth_mode),
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" _build_email_stat_card("Application URL", application_url),
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Application URL</div>" _build_email_stat_card("Template shell", "Branded HTML", "Logo, gradient, action buttons"),
f"<div style=\"font-size:16px; font-weight:700; line-height:1.5;\">{html.escape(application_url)}</div>" ]
"</td>" )
"</tr>" + _build_email_panel(
"</table>" "What this verifies",
"<div style=\"margin:0; padding:18px; background:#101726; border:1px dashed rgba(59,130,246,0.32); border-radius:18px; color:#dbe5ff; font-size:14px; line-height:1.7;\">" _build_email_list(
"Use this test when changing SMTP settings, relay targets, or branding." [
"</div>" "Magent can build the HTML template shell correctly.",
"The configured SMTP route accepts and relays the message.",
"Branding, links, and build metadata are rendering consistently.",
]
),
variant="brand",
)
+ _build_email_panel(
"Delivery notes",
(
f"<div style=\"white-space:pre-line;\">{html.escape(delivery_warning)}</div>"
if delivery_warning
else "Use this test when changing SMTP settings, relay targets, or branding."
),
variant="warning" if delivery_warning else "neutral",
)
), ),
primary_label="Open Magent" if primary_url else "", primary_label="Open Magent" if primary_url else "",
primary_url=primary_url, primary_url=primary_url,
@@ -930,27 +1258,39 @@ async def send_password_reset_email(
subtitle=f"This will update the credentials used for {provider_label}.", subtitle=f"This will update the credentials used for {provider_label}.",
tone="brand", tone="brand",
body_html=( body_html=(
f"<div style=\"margin:0 0 18px; color:#e9ecf5; font-size:15px; line-height:1.7;\">" f"<div style=\"margin:0 0 18px; color:#132033; font-size:15px; line-height:1.7;\">"
f"A password reset was requested for <strong>{html.escape(username)}</strong>." f"A password reset was requested for <strong>{html.escape(username)}</strong>."
"</div>" "</div>"
"<table role=\"presentation\" width=\"100%\" cellspacing=\"0\" cellpadding=\"0\" style=\"border-collapse:separate; border-spacing:10px 10px; margin:0 0 18px;\">" + _build_email_stat_grid(
"<tr>" [
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" _build_email_stat_card("Account", username),
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Account</div>" _build_email_stat_card("Expires", expires_at),
f"<div style=\"font-size:22px; font-weight:800;\">{html.escape(username)}</div>" _build_email_stat_card("Credentials updated", provider_label),
"</td>" _build_email_stat_card("Delivery target", resolved_email),
"<td width=\"50%\" style=\"padding:16px; background:#151c2d; border:1px solid rgba(255,255,255,0.08); border-radius:16px; color:#e9ecf5;\">" ]
"<div style=\"font-size:11px; letter-spacing:0.12em; text-transform:uppercase; color:#9aa3b8; margin-bottom:8px;\">Expires</div>" )
f"<div style=\"font-size:16px; font-weight:700; line-height:1.5;\">{html.escape(expires_at)}</div>" + _build_email_panel(
"</td>" "What will be updated",
"</tr>" f"This reset will update the password used for <strong>{html.escape(provider_label)}</strong>.",
"</table>" variant="brand",
f"<div style=\"margin:0 0 18px; padding:18px; background:#101726; border:1px solid rgba(59,130,246,0.22); border-radius:18px; color:#dbe5ff; font-size:14px; line-height:1.7;\">" )
f"This reset will update the password used for <strong>{html.escape(provider_label)}</strong>." + _build_email_panel(
"</div>" "What happens next",
"<div style=\"margin:0; padding:18px; background:#1a1220; border:1px dashed rgba(255,107,43,0.38); border-radius:18px; color:#ffd3bf; font-size:14px; line-height:1.7;\">" _build_email_list(
"If you did not request this reset, ignore this email. No changes will be applied until the reset link is opened and completed." [
"</div>" "Open the reset link and choose a new password.",
"Complete the form before the expiry time shown above.",
"Use the new password the next time you sign in.",
],
ordered=True,
),
variant="neutral",
)
+ _build_email_panel(
"Safety note",
"If you did not request this reset, ignore this email. No changes will be applied until the reset link is opened and completed.",
variant="warning",
)
), ),
primary_label="Reset password", primary_label="Reset password",
primary_url=reset_url, primary_url=reset_url,

View File

@@ -6,12 +6,15 @@ from ..clients.jellyfin import JellyfinClient
from ..db import ( from ..db import (
create_user_if_missing, create_user_if_missing,
get_user_by_username, get_user_by_username,
set_user_email,
set_user_auth_provider, set_user_auth_provider,
set_user_jellyseerr_id, set_user_jellyseerr_id,
) )
from ..runtime import get_runtime_settings from ..runtime import get_runtime_settings
from .user_cache import ( from .user_cache import (
build_jellyseerr_candidate_map, build_jellyseerr_candidate_map,
extract_jellyseerr_user_email,
find_matching_jellyseerr_user,
get_cached_jellyseerr_users, get_cached_jellyseerr_users,
match_jellyseerr_user_id, match_jellyseerr_user_id,
save_jellyfin_users_cache, save_jellyfin_users_cache,
@@ -41,10 +44,13 @@ async def sync_jellyfin_users() -> int:
if not name: if not name:
continue continue
matched_id = match_jellyseerr_user_id(name, candidate_map) if candidate_map else None matched_id = match_jellyseerr_user_id(name, candidate_map) if candidate_map else None
matched_seerr_user = find_matching_jellyseerr_user(name, jellyseerr_users or [])
matched_email = extract_jellyseerr_user_email(matched_seerr_user)
created = create_user_if_missing( created = create_user_if_missing(
name, name,
"jellyfin-user", "jellyfin-user",
role="user", role="user",
email=matched_email,
auth_provider="jellyfin", auth_provider="jellyfin",
jellyseerr_user_id=matched_id, jellyseerr_user_id=matched_id,
) )
@@ -60,6 +66,8 @@ async def sync_jellyfin_users() -> int:
set_user_auth_provider(name, "jellyfin") set_user_auth_provider(name, "jellyfin")
if matched_id is not None: if matched_id is not None:
set_user_jellyseerr_id(name, matched_id) set_user_jellyseerr_id(name, matched_id)
if matched_email:
set_user_email(name, matched_email)
return imported return imported

View File

@@ -112,6 +112,9 @@ async def _fetch_all_seerr_users() -> list[dict]:
def _resolve_seerr_user_email(seerr_user: Optional[dict], local_user: Optional[dict]) -> Optional[str]: def _resolve_seerr_user_email(seerr_user: Optional[dict], local_user: Optional[dict]) -> Optional[str]:
if isinstance(local_user, dict): if isinstance(local_user, dict):
stored_email = str(local_user.get("email") or "").strip()
if "@" in stored_email:
return stored_email
username = str(local_user.get("username") or "").strip() username = str(local_user.get("username") or "").strip()
if "@" in username: if "@" in username:
return username return username

View File

@@ -89,6 +89,33 @@ def build_jellyseerr_candidate_map(users: List[Dict[str, Any]]) -> Dict[str, int
return candidate_to_id return candidate_to_id
def find_matching_jellyseerr_user(
identifier: str, users: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
target_handles = set(_normalized_handles(identifier))
if not target_handles:
return None
for user in users:
if not isinstance(user, dict):
continue
for key in ("username", "email", "displayName", "name"):
if target_handles.intersection(_normalized_handles(user.get(key))):
return user
return None
def extract_jellyseerr_user_email(user: Optional[Dict[str, Any]]) -> Optional[str]:
if not isinstance(user, dict):
return None
value = user.get("email")
if not isinstance(value, str):
return None
candidate = value.strip()
if not candidate or "@" not in candidate:
return None
return candidate
def match_jellyseerr_user_id( def match_jellyseerr_user_id(
username: str, candidate_map: Dict[str, int] username: str, candidate_map: Dict[str, int]
) -> Optional[int]: ) -> Optional[int]:

View File

@@ -3,7 +3,7 @@ uvicorn==0.41.0
httpx==0.28.1 httpx==0.28.1
pydantic==2.12.5 pydantic==2.12.5
pydantic-settings==2.13.1 pydantic-settings==2.13.1
python-jose[cryptography]==3.5.0 PyJWT==2.11.0
passlib==1.7.4 passlib==1.7.4
python-multipart==0.0.22 python-multipart==0.0.22
Pillow==12.1.1 Pillow==12.1.1

View File

@@ -0,0 +1,145 @@
import os
import tempfile
import unittest
from unittest.mock import AsyncMock, patch
from fastapi import HTTPException
from starlette.requests import Request
from backend.app import db
from backend.app.config import settings
from backend.app.routers import auth as auth_router
from backend.app.security import PASSWORD_POLICY_MESSAGE, validate_password_policy
from backend.app.services import password_reset
def _build_request(ip: str = "127.0.0.1", user_agent: str = "backend-test") -> Request:
scope = {
"type": "http",
"http_version": "1.1",
"method": "POST",
"scheme": "http",
"path": "/auth/password/forgot",
"raw_path": b"/auth/password/forgot",
"query_string": b"",
"headers": [(b"user-agent", user_agent.encode("utf-8"))],
"client": (ip, 12345),
"server": ("testserver", 8000),
}
async def receive() -> dict:
return {"type": "http.request", "body": b"", "more_body": False}
return Request(scope, receive)
class TempDatabaseMixin:
def setUp(self) -> None:
super_method = getattr(super(), "setUp", None)
if callable(super_method):
super_method()
self._tempdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
self._original_sqlite_path = settings.sqlite_path
self._original_journal_mode = getattr(settings, "sqlite_journal_mode", "DELETE")
settings.sqlite_path = os.path.join(self._tempdir.name, "test.db")
settings.sqlite_journal_mode = "DELETE"
auth_router._LOGIN_ATTEMPTS_BY_IP.clear()
auth_router._LOGIN_ATTEMPTS_BY_USER.clear()
auth_router._RESET_ATTEMPTS_BY_IP.clear()
auth_router._RESET_ATTEMPTS_BY_IDENTIFIER.clear()
db.init_db()
def tearDown(self) -> None:
settings.sqlite_path = self._original_sqlite_path
settings.sqlite_journal_mode = self._original_journal_mode
auth_router._LOGIN_ATTEMPTS_BY_IP.clear()
auth_router._LOGIN_ATTEMPTS_BY_USER.clear()
auth_router._RESET_ATTEMPTS_BY_IP.clear()
auth_router._RESET_ATTEMPTS_BY_IDENTIFIER.clear()
self._tempdir.cleanup()
super_method = getattr(super(), "tearDown", None)
if callable(super_method):
super_method()
class PasswordPolicyTests(unittest.TestCase):
def test_validate_password_policy_rejects_short_passwords(self) -> None:
with self.assertRaisesRegex(ValueError, PASSWORD_POLICY_MESSAGE):
validate_password_policy("short")
def test_validate_password_policy_trims_whitespace(self) -> None:
self.assertEqual(validate_password_policy(" password123 "), "password123")
class DatabaseEmailTests(TempDatabaseMixin, unittest.TestCase):
def test_set_user_email_is_case_insensitive(self) -> None:
created = db.create_user_if_missing(
"MixedCaseUser",
"password123",
email=None,
auth_provider="local",
)
self.assertTrue(created)
updated = db.set_user_email("mixedcaseuser", "mixed@example.com")
self.assertTrue(updated)
stored = db.get_user_by_username("MIXEDCASEUSER")
self.assertIsNotNone(stored)
self.assertEqual(stored.get("email"), "mixed@example.com")
class AuthFlowTests(TempDatabaseMixin, unittest.IsolatedAsyncioTestCase):
async def test_forgot_password_is_rate_limited(self) -> None:
request = _build_request(ip="10.1.2.3")
payload = {"identifier": "resetuser@example.com"}
with patch.object(auth_router, "smtp_email_config_ready", return_value=(True, "")), patch.object(
auth_router,
"request_password_reset",
new=AsyncMock(return_value={"status": "ok", "issued": False}),
):
for _ in range(3):
result = await auth_router.forgot_password(payload, request)
self.assertEqual(result["status"], "ok")
with self.assertRaises(HTTPException) as context:
await auth_router.forgot_password(payload, request)
self.assertEqual(context.exception.status_code, 429)
self.assertEqual(
context.exception.detail,
"Too many password reset attempts. Try again shortly.",
)
async def test_request_password_reset_prefers_local_user_email(self) -> None:
db.create_user_if_missing(
"ResetUser",
"password123",
email="local@example.com",
auth_provider="local",
)
with patch.object(
password_reset,
"send_password_reset_email",
new=AsyncMock(return_value={"status": "ok"}),
) as send_email:
result = await password_reset.request_password_reset("ResetUser")
self.assertTrue(result["issued"])
self.assertEqual(result["recipient_email"], "local@example.com")
send_email.assert_awaited_once()
self.assertEqual(send_email.await_args.kwargs["recipient_email"], "local@example.com")
async def test_profile_invite_requires_recipient_email(self) -> None:
current_user = {
"username": "invite-owner",
"role": "user",
"invite_management_enabled": True,
"profile_id": None,
}
with self.assertRaises(HTTPException) as context:
await auth_router.create_profile_invite({"label": "Missing email"}, current_user)
self.assertEqual(context.exception.status_code, 400)
self.assertEqual(
context.exception.detail,
"recipient_email is required and must be a valid email address.",
)

View File

@@ -2296,14 +2296,6 @@ export default function SettingsPage({ section }: SettingsPageProps) {
/> />
</label> </label>
) : null} ) : null}
<button
type="button"
className="settings-action-button"
onClick={() => void saveSettingGroup(sectionGroup)}
disabled={sectionSaving[sectionGroup.key] || sectionTesting[sectionGroup.key]}
>
{sectionSaving[sectionGroup.key] ? 'Saving...' : 'Save section'}
</button>
{getSectionTestLabel(sectionGroup.key) ? ( {getSectionTestLabel(sectionGroup.key) ? (
<button <button
type="button" type="button"
@@ -2316,6 +2308,14 @@ export default function SettingsPage({ section }: SettingsPageProps) {
: getSectionTestLabel(sectionGroup.key)} : getSectionTestLabel(sectionGroup.key)}
</button> </button>
) : null} ) : null}
<button
type="button"
className="settings-action-button"
onClick={() => void saveSettingGroup(sectionGroup)}
disabled={sectionSaving[sectionGroup.key] || sectionTesting[sectionGroup.key]}
>
{sectionSaving[sectionGroup.key] ? 'Saving...' : 'Save section'}
</button>
</div> </div>
</section> </section>
))} ))}

View File

@@ -156,6 +156,8 @@ const formatDate = (value?: string | null) => {
return date.toLocaleString() return date.toLocaleString()
} }
const isValidEmail = (value: string) => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value.trim())
const isInviteTraceRowInvited = (row: InviteTraceRow) => const isInviteTraceRowInvited = (row: InviteTraceRow) =>
Boolean(String(row.inviterUsername || '').trim() || String(row.inviteCode || '').trim()) Boolean(String(row.inviterUsername || '').trim() || String(row.inviteCode || '').trim())
@@ -349,6 +351,17 @@ export default function AdminInviteManagementPage() {
const saveInvite = async (event: React.FormEvent) => { const saveInvite = async (event: React.FormEvent) => {
event.preventDefault() event.preventDefault()
const recipientEmail = inviteForm.recipient_email.trim()
if (!recipientEmail) {
setError('Recipient email is required.')
setStatus(null)
return
}
if (!isValidEmail(recipientEmail)) {
setError('Recipient email must be valid.')
setStatus(null)
return
}
setInviteSaving(true) setInviteSaving(true)
setError(null) setError(null)
setStatus(null) setStatus(null)
@@ -363,7 +376,7 @@ export default function AdminInviteManagementPage() {
max_uses: inviteForm.max_uses || null, max_uses: inviteForm.max_uses || null,
enabled: inviteForm.enabled, enabled: inviteForm.enabled,
expires_at: inviteForm.expires_at || null, expires_at: inviteForm.expires_at || null,
recipient_email: inviteForm.recipient_email || null, recipient_email: recipientEmail,
send_email: inviteForm.send_email, send_email: inviteForm.send_email,
message: inviteForm.message || null, message: inviteForm.message || null,
} }
@@ -1607,18 +1620,19 @@ export default function AdminInviteManagementPage() {
<div className="invite-form-row"> <div className="invite-form-row">
<div className="invite-form-row-label"> <div className="invite-form-row-label">
<span>Delivery</span> <span>Delivery</span>
<small>Save a recipient email and optionally send the invite immediately.</small> <small>Recipient email is required. You can optionally send the invite immediately after saving.</small>
</div> </div>
<div className="invite-form-row-control invite-form-row-control--stacked"> <div className="invite-form-row-control invite-form-row-control--stacked">
<label> <label>
<span>Recipient email</span> <span>Recipient email (required)</span>
<input <input
type="email" type="email"
required
value={inviteForm.recipient_email} value={inviteForm.recipient_email}
onChange={(e) => onChange={(e) =>
setInviteForm((current) => ({ ...current, recipient_email: e.target.value })) setInviteForm((current) => ({ ...current, recipient_email: e.target.value }))
} }
placeholder="person@example.com" placeholder="Required recipient email"
/> />
</label> </label>
<label> <label>

View File

@@ -82,6 +82,8 @@ const formatDate = (value?: string | null) => {
return date.toLocaleString() return date.toLocaleString()
} }
const isValidEmail = (value: string) => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value.trim())
export default function ProfileInvitesPage() { export default function ProfileInvitesPage() {
const router = useRouter() const router = useRouter()
const [profile, setProfile] = useState<ProfileInfo | null>(null) const [profile, setProfile] = useState<ProfileInfo | null>(null)
@@ -192,6 +194,17 @@ export default function ProfileInvitesPage() {
const saveInvite = async (event: React.FormEvent) => { const saveInvite = async (event: React.FormEvent) => {
event.preventDefault() event.preventDefault()
const recipientEmail = inviteForm.recipient_email.trim()
if (!recipientEmail) {
setInviteError('Recipient email is required.')
setInviteStatus(null)
return
}
if (!isValidEmail(recipientEmail)) {
setInviteError('Recipient email must be valid.')
setInviteStatus(null)
return
}
setInviteSaving(true) setInviteSaving(true)
setInviteError(null) setInviteError(null)
setInviteStatus(null) setInviteStatus(null)
@@ -208,7 +221,7 @@ export default function ProfileInvitesPage() {
code: inviteForm.code || null, code: inviteForm.code || null,
label: inviteForm.label || null, label: inviteForm.label || null,
description: inviteForm.description || null, description: inviteForm.description || null,
recipient_email: inviteForm.recipient_email || null, recipient_email: recipientEmail,
max_uses: inviteForm.max_uses || null, max_uses: inviteForm.max_uses || null,
expires_at: inviteForm.expires_at || null, expires_at: inviteForm.expires_at || null,
enabled: inviteForm.enabled, enabled: inviteForm.enabled,
@@ -438,13 +451,14 @@ export default function ProfileInvitesPage() {
<div className="invite-form-row"> <div className="invite-form-row">
<div className="invite-form-row-label"> <div className="invite-form-row-label">
<span>Delivery</span> <span>Delivery</span>
<small>Save a recipient email and optionally send the invite immediately.</small> <small>Recipient email is required. You can also send the invite immediately after saving.</small>
</div> </div>
<div className="invite-form-row-control invite-form-row-control--stacked"> <div className="invite-form-row-control invite-form-row-control--stacked">
<label> <label>
<span>Recipient email</span> <span>Recipient email (required)</span>
<input <input
type="email" type="email"
required
value={inviteForm.recipient_email} value={inviteForm.recipient_email}
onChange={(event) => onChange={(event) =>
setInviteForm((current) => ({ setInviteForm((current) => ({
@@ -452,7 +466,7 @@ export default function ProfileInvitesPage() {
recipient_email: event.target.value, recipient_email: event.target.value,
})) }))
} }
placeholder="friend@example.com" placeholder="Required recipient email"
/> />
</label> </label>
<label> <label>

View File

@@ -20,6 +20,7 @@ type UserStats = {
type AdminUser = { type AdminUser = {
id?: number id?: number
username: string username: string
email?: string | null
role: string role: string
auth_provider?: string | null auth_provider?: string | null
last_login_at?: string | null last_login_at?: string | null
@@ -459,6 +460,10 @@ export default function UserDetailPage() {
</p> </p>
</div> </div>
<div className="user-detail-meta-grid"> <div className="user-detail-meta-grid">
<div className="user-detail-meta-item">
<span className="label">Email</span>
<strong>{user.email || 'Not set'}</strong>
</div>
<div className="user-detail-meta-item"> <div className="user-detail-meta-item">
<span className="label">Seerr ID</span> <span className="label">Seerr ID</span>
<strong>{user.jellyseerr_user_id ?? user.id ?? 'Unknown'}</strong> <strong>{user.jellyseerr_user_id ?? user.id ?? 'Unknown'}</strong>

View File

@@ -9,6 +9,7 @@ import AdminShell from '../ui/AdminShell'
type AdminUser = { type AdminUser = {
id: number id: number
username: string username: string
email?: string | null
role: string role: string
authProvider?: string | null authProvider?: string | null
lastLoginAt?: string | null lastLoginAt?: string | null
@@ -109,6 +110,7 @@ export default function UsersPage() {
setUsers( setUsers(
data.users.map((user: any) => ({ data.users.map((user: any) => ({
username: user.username ?? 'Unknown', username: user.username ?? 'Unknown',
email: user.email ?? null,
role: user.role ?? 'user', role: user.role ?? 'user',
authProvider: user.auth_provider ?? 'local', authProvider: user.auth_provider ?? 'local',
lastLoginAt: user.last_login_at ?? null, lastLoginAt: user.last_login_at ?? null,
@@ -239,6 +241,7 @@ export default function UsersPage() {
? users.filter((user) => { ? users.filter((user) => {
const fields = [ const fields = [
user.username, user.username,
user.email || '',
user.role, user.role,
user.authProvider || '', user.authProvider || '',
user.profileId != null ? String(user.profileId) : '', user.profileId != null ? String(user.profileId) : '',
@@ -419,6 +422,9 @@ export default function UsersPage() {
<strong>{user.username}</strong> <strong>{user.username}</strong>
<span className="user-grid-meta">{user.role}</span> <span className="user-grid-meta">{user.role}</span>
</div> </div>
<div className="user-directory-subtext">
{user.email || 'No email on file'}
</div>
<div className="user-directory-subtext"> <div className="user-directory-subtext">
Login: {user.authProvider || 'local'} Profile: {user.profileId ?? 'None'} Login: {user.authProvider || 'local'} Profile: {user.profileId ?? 'None'}
</div> </div>

View File

@@ -1,12 +1,12 @@
{ {
"name": "magent-frontend", "name": "magent-frontend",
"version": "0303261629", "version": "0403261321",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "magent-frontend", "name": "magent-frontend",
"version": "0303261629", "version": "0403261321",
"dependencies": { "dependencies": {
"next": "16.1.6", "next": "16.1.6",
"react": "19.2.4", "react": "19.2.4",

View File

@@ -1,7 +1,7 @@
{ {
"name": "magent-frontend", "name": "magent-frontend",
"private": true, "private": true,
"version": "0303261629", "version": "0403261321",
"scripts": { "scripts": {
"dev": "next dev", "dev": "next dev",
"build": "next build", "build": "next build",

View File

@@ -3,6 +3,11 @@ $ErrorActionPreference = "Stop"
$repoRoot = Resolve-Path "$PSScriptRoot\\.." $repoRoot = Resolve-Path "$PSScriptRoot\\.."
Set-Location $repoRoot Set-Location $repoRoot
powershell -ExecutionPolicy Bypass -File (Join-Path $repoRoot "scripts\run_backend_quality_gate.ps1")
if ($LASTEXITCODE -ne 0) {
throw "scripts/run_backend_quality_gate.ps1 failed with exit code $LASTEXITCODE."
}
$now = Get-Date $now = Get-Date
$buildNumber = "{0}{1}{2}{3}{4}" -f $now.ToString("dd"), $now.ToString("MM"), $now.ToString("yy"), $now.ToString("HH"), $now.ToString("mm") $buildNumber = "{0}{1}{2}{3}{4}" -f $now.ToString("dd"), $now.ToString("MM"), $now.ToString("yy"), $now.ToString("HH"), $now.ToString("mm")

View File

@@ -0,0 +1,153 @@
from __future__ import annotations
import argparse
import csv
import json
import sqlite3
from collections import Counter
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CSV_PATH = ROOT / "data" / "jellyfin_users_normalized.csv"
DEFAULT_DB_PATH = ROOT / "data" / "magent.db"
def _normalize_email(value: object) -> str | None:
if not isinstance(value, str):
return None
candidate = value.strip()
if not candidate or "@" not in candidate:
return None
return candidate
def _load_rows(csv_path: Path) -> list[dict[str, str]]:
with csv_path.open("r", encoding="utf-8", newline="") as handle:
return [dict(row) for row in csv.DictReader(handle)]
def _ensure_email_column(conn: sqlite3.Connection) -> None:
try:
conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
except sqlite3.OperationalError:
pass
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_email_nocase
ON users (email COLLATE NOCASE)
"""
)
def _lookup_user(conn: sqlite3.Connection, username: str) -> list[sqlite3.Row]:
return conn.execute(
"""
SELECT id, username, email
FROM users
WHERE username = ? COLLATE NOCASE
ORDER BY
CASE WHEN username = ? THEN 0 ELSE 1 END,
id ASC
""",
(username, username),
).fetchall()
def import_user_emails(csv_path: Path, db_path: Path) -> dict[str, object]:
rows = _load_rows(csv_path)
username_counts = Counter(
str(row.get("Username") or "").strip().lower()
for row in rows
if str(row.get("Username") or "").strip()
)
duplicate_usernames = {
username for username, count in username_counts.items() if username and count > 1
}
summary: dict[str, object] = {
"csv_path": str(csv_path),
"db_path": str(db_path),
"source_rows": len(rows),
"updated": 0,
"unchanged": 0,
"missing_email": [],
"missing_user": [],
"duplicate_source_username": [],
}
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
_ensure_email_column(conn)
for row in rows:
username = str(row.get("Username") or "").strip()
if not username:
continue
username_key = username.lower()
if username_key in duplicate_usernames:
cast_list = summary["duplicate_source_username"]
assert isinstance(cast_list, list)
if username not in cast_list:
cast_list.append(username)
continue
email = _normalize_email(row.get("Email"))
if not email:
cast_list = summary["missing_email"]
assert isinstance(cast_list, list)
cast_list.append(username)
continue
matches = _lookup_user(conn, username)
if not matches:
cast_list = summary["missing_user"]
assert isinstance(cast_list, list)
cast_list.append(username)
continue
current_emails = {
normalized.lower()
for normalized in (_normalize_email(row["email"]) for row in matches)
if normalized
}
if current_emails == {email.lower()}:
summary["unchanged"] = int(summary["unchanged"]) + 1
continue
conn.execute(
"""
UPDATE users
SET email = ?
WHERE username = ? COLLATE NOCASE
""",
(email, username),
)
summary["updated"] = int(summary["updated"]) + 1
summary["missing_email_count"] = len(summary["missing_email"]) # type: ignore[arg-type]
summary["missing_user_count"] = len(summary["missing_user"]) # type: ignore[arg-type]
summary["duplicate_source_username_count"] = len(summary["duplicate_source_username"]) # type: ignore[arg-type]
return summary
def main() -> None:
parser = argparse.ArgumentParser(description="Import user email addresses into Magent users.")
parser.add_argument(
"csv_path",
nargs="?",
default=str(DEFAULT_CSV_PATH),
help="CSV file containing Username and Email columns",
)
parser.add_argument(
"--db-path",
default=str(DEFAULT_DB_PATH),
help="Path to the Magent SQLite database",
)
args = parser.parse_args()
summary = import_user_emails(Path(args.csv_path), Path(args.db_path))
print(json.dumps(summary, indent=2, sort_keys=True))
if __name__ == "__main__":
main()

View File

@@ -243,6 +243,10 @@ try {
$script:CurrentStep = "updating build metadata" $script:CurrentStep = "updating build metadata"
Update-BuildFiles -BuildNumber $buildNumber Update-BuildFiles -BuildNumber $buildNumber
$script:CurrentStep = "running backend quality gate"
powershell -ExecutionPolicy Bypass -File (Join-Path $repoRoot "scripts\run_backend_quality_gate.ps1")
Assert-LastExitCode -CommandName "scripts/run_backend_quality_gate.ps1"
$script:CurrentStep = "rebuilding local docker stack" $script:CurrentStep = "rebuilding local docker stack"
docker compose up -d --build docker compose up -d --build
Assert-LastExitCode -CommandName "docker compose up -d --build" Assert-LastExitCode -CommandName "docker compose up -d --build"

View File

@@ -0,0 +1,59 @@
$ErrorActionPreference = "Stop"
$repoRoot = Resolve-Path "$PSScriptRoot\.."
Set-Location $repoRoot
function Assert-LastExitCode {
param([Parameter(Mandatory = $true)][string]$CommandName)
if ($LASTEXITCODE -ne 0) {
throw "$CommandName failed with exit code $LASTEXITCODE."
}
}
function Get-PythonCommand {
$venvPython = Join-Path $repoRoot ".venv\Scripts\python.exe"
if (Test-Path $venvPython) {
return $venvPython
}
return "python"
}
function Ensure-PythonModule {
param(
[Parameter(Mandatory = $true)][string]$PythonExe,
[Parameter(Mandatory = $true)][string]$ModuleName,
[Parameter(Mandatory = $true)][string]$PackageName
)
& $PythonExe -c "import importlib.util, sys; sys.exit(0 if importlib.util.find_spec('$ModuleName') else 1)"
if ($LASTEXITCODE -eq 0) {
return
}
Write-Host "Installing missing Python package: $PackageName"
& $PythonExe -m pip install $PackageName
Assert-LastExitCode -CommandName "python -m pip install $PackageName"
}
$pythonExe = Get-PythonCommand
Write-Host "Installing backend Python requirements"
& $pythonExe -m pip install -r (Join-Path $repoRoot "backend\requirements.txt")
Assert-LastExitCode -CommandName "python -m pip install -r backend/requirements.txt"
Write-Host "Running Python dependency integrity check"
& $pythonExe -m pip check
Assert-LastExitCode -CommandName "python -m pip check"
Ensure-PythonModule -PythonExe $pythonExe -ModuleName "pip_audit" -PackageName "pip-audit"
Write-Host "Running Python vulnerability scan"
& $pythonExe -m pip_audit -r (Join-Path $repoRoot "backend\requirements.txt") --progress-spinner off --desc
Assert-LastExitCode -CommandName "python -m pip_audit"
Write-Host "Running backend unit tests"
& $pythonExe -m unittest discover -s backend/tests -p "test_*.py" -v
Assert-LastExitCode -CommandName "python -m unittest discover"
Write-Host "Backend quality gate passed"