Files
Magent/scripts/import_user_emails.py

154 lines
4.7 KiB
Python

from __future__ import annotations
import argparse
import csv
import json
import sqlite3
from collections import Counter
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CSV_PATH = ROOT / "data" / "jellyfin_users_normalized.csv"
DEFAULT_DB_PATH = ROOT / "data" / "magent.db"
def _normalize_email(value: object) -> str | None:
if not isinstance(value, str):
return None
candidate = value.strip()
if not candidate or "@" not in candidate:
return None
return candidate
def _load_rows(csv_path: Path) -> list[dict[str, str]]:
with csv_path.open("r", encoding="utf-8", newline="") as handle:
return [dict(row) for row in csv.DictReader(handle)]
def _ensure_email_column(conn: sqlite3.Connection) -> None:
try:
conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
except sqlite3.OperationalError:
pass
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_email_nocase
ON users (email COLLATE NOCASE)
"""
)
def _lookup_user(conn: sqlite3.Connection, username: str) -> list[sqlite3.Row]:
return conn.execute(
"""
SELECT id, username, email
FROM users
WHERE username = ? COLLATE NOCASE
ORDER BY
CASE WHEN username = ? THEN 0 ELSE 1 END,
id ASC
""",
(username, username),
).fetchall()
def import_user_emails(csv_path: Path, db_path: Path) -> dict[str, object]:
rows = _load_rows(csv_path)
username_counts = Counter(
str(row.get("Username") or "").strip().lower()
for row in rows
if str(row.get("Username") or "").strip()
)
duplicate_usernames = {
username for username, count in username_counts.items() if username and count > 1
}
summary: dict[str, object] = {
"csv_path": str(csv_path),
"db_path": str(db_path),
"source_rows": len(rows),
"updated": 0,
"unchanged": 0,
"missing_email": [],
"missing_user": [],
"duplicate_source_username": [],
}
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
_ensure_email_column(conn)
for row in rows:
username = str(row.get("Username") or "").strip()
if not username:
continue
username_key = username.lower()
if username_key in duplicate_usernames:
cast_list = summary["duplicate_source_username"]
assert isinstance(cast_list, list)
if username not in cast_list:
cast_list.append(username)
continue
email = _normalize_email(row.get("Email"))
if not email:
cast_list = summary["missing_email"]
assert isinstance(cast_list, list)
cast_list.append(username)
continue
matches = _lookup_user(conn, username)
if not matches:
cast_list = summary["missing_user"]
assert isinstance(cast_list, list)
cast_list.append(username)
continue
current_emails = {
normalized.lower()
for normalized in (_normalize_email(row["email"]) for row in matches)
if normalized
}
if current_emails == {email.lower()}:
summary["unchanged"] = int(summary["unchanged"]) + 1
continue
conn.execute(
"""
UPDATE users
SET email = ?
WHERE username = ? COLLATE NOCASE
""",
(email, username),
)
summary["updated"] = int(summary["updated"]) + 1
summary["missing_email_count"] = len(summary["missing_email"]) # type: ignore[arg-type]
summary["missing_user_count"] = len(summary["missing_user"]) # type: ignore[arg-type]
summary["duplicate_source_username_count"] = len(summary["duplicate_source_username"]) # type: ignore[arg-type]
return summary
def main() -> None:
parser = argparse.ArgumentParser(description="Import user email addresses into Magent users.")
parser.add_argument(
"csv_path",
nargs="?",
default=str(DEFAULT_CSV_PATH),
help="CSV file containing Username and Email columns",
)
parser.add_argument(
"--db-path",
default=str(DEFAULT_DB_PATH),
help="Path to the Magent SQLite database",
)
args = parser.parse_args()
summary = import_user_emails(Path(args.csv_path), Path(args.db_path))
print(json.dumps(summary, indent=2, sort_keys=True))
if __name__ == "__main__":
main()