from __future__ import annotations import argparse import csv import json import sqlite3 from collections import Counter from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DEFAULT_CSV_PATH = ROOT / "data" / "jellyfin_users_normalized.csv" DEFAULT_DB_PATH = ROOT / "data" / "magent.db" def _normalize_email(value: object) -> str | None: if not isinstance(value, str): return None candidate = value.strip() if not candidate or "@" not in candidate: return None return candidate def _load_rows(csv_path: Path) -> list[dict[str, str]]: with csv_path.open("r", encoding="utf-8", newline="") as handle: return [dict(row) for row in csv.DictReader(handle)] def _ensure_email_column(conn: sqlite3.Connection) -> None: try: conn.execute("ALTER TABLE users ADD COLUMN email TEXT") except sqlite3.OperationalError: pass conn.execute( """ CREATE INDEX IF NOT EXISTS idx_users_email_nocase ON users (email COLLATE NOCASE) """ ) def _lookup_user(conn: sqlite3.Connection, username: str) -> list[sqlite3.Row]: return conn.execute( """ SELECT id, username, email FROM users WHERE username = ? COLLATE NOCASE ORDER BY CASE WHEN username = ? THEN 0 ELSE 1 END, id ASC """, (username, username), ).fetchall() def import_user_emails(csv_path: Path, db_path: Path) -> dict[str, object]: rows = _load_rows(csv_path) username_counts = Counter( str(row.get("Username") or "").strip().lower() for row in rows if str(row.get("Username") or "").strip() ) duplicate_usernames = { username for username, count in username_counts.items() if username and count > 1 } summary: dict[str, object] = { "csv_path": str(csv_path), "db_path": str(db_path), "source_rows": len(rows), "updated": 0, "unchanged": 0, "missing_email": [], "missing_user": [], "duplicate_source_username": [], } with sqlite3.connect(db_path) as conn: conn.row_factory = sqlite3.Row _ensure_email_column(conn) for row in rows: username = str(row.get("Username") or "").strip() if not username: continue username_key = username.lower() if username_key in duplicate_usernames: cast_list = summary["duplicate_source_username"] assert isinstance(cast_list, list) if username not in cast_list: cast_list.append(username) continue email = _normalize_email(row.get("Email")) if not email: cast_list = summary["missing_email"] assert isinstance(cast_list, list) cast_list.append(username) continue matches = _lookup_user(conn, username) if not matches: cast_list = summary["missing_user"] assert isinstance(cast_list, list) cast_list.append(username) continue current_emails = { normalized.lower() for normalized in (_normalize_email(row["email"]) for row in matches) if normalized } if current_emails == {email.lower()}: summary["unchanged"] = int(summary["unchanged"]) + 1 continue conn.execute( """ UPDATE users SET email = ? WHERE username = ? COLLATE NOCASE """, (email, username), ) summary["updated"] = int(summary["updated"]) + 1 summary["missing_email_count"] = len(summary["missing_email"]) # type: ignore[arg-type] summary["missing_user_count"] = len(summary["missing_user"]) # type: ignore[arg-type] summary["duplicate_source_username_count"] = len(summary["duplicate_source_username"]) # type: ignore[arg-type] return summary def main() -> None: parser = argparse.ArgumentParser(description="Import user email addresses into Magent users.") parser.add_argument( "csv_path", nargs="?", default=str(DEFAULT_CSV_PATH), help="CSV file containing Username and Email columns", ) parser.add_argument( "--db-path", default=str(DEFAULT_DB_PATH), help="Path to the Magent SQLite database", ) args = parser.parse_args() summary = import_user_emails(Path(args.csv_path), Path(args.db_path)) print(json.dumps(summary, indent=2, sort_keys=True)) if __name__ == "__main__": main()