Harden auth flows and add backend quality gate
This commit is contained in:
153
scripts/import_user_emails.py
Normal file
153
scripts/import_user_emails.py
Normal file
@@ -0,0 +1,153 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import sqlite3
|
||||
from collections import Counter
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
DEFAULT_CSV_PATH = ROOT / "data" / "jellyfin_users_normalized.csv"
|
||||
DEFAULT_DB_PATH = ROOT / "data" / "magent.db"
|
||||
|
||||
|
||||
def _normalize_email(value: object) -> str | None:
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
candidate = value.strip()
|
||||
if not candidate or "@" not in candidate:
|
||||
return None
|
||||
return candidate
|
||||
|
||||
|
||||
def _load_rows(csv_path: Path) -> list[dict[str, str]]:
|
||||
with csv_path.open("r", encoding="utf-8", newline="") as handle:
|
||||
return [dict(row) for row in csv.DictReader(handle)]
|
||||
|
||||
|
||||
def _ensure_email_column(conn: sqlite3.Connection) -> None:
|
||||
try:
|
||||
conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_users_email_nocase
|
||||
ON users (email COLLATE NOCASE)
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def _lookup_user(conn: sqlite3.Connection, username: str) -> list[sqlite3.Row]:
|
||||
return conn.execute(
|
||||
"""
|
||||
SELECT id, username, email
|
||||
FROM users
|
||||
WHERE username = ? COLLATE NOCASE
|
||||
ORDER BY
|
||||
CASE WHEN username = ? THEN 0 ELSE 1 END,
|
||||
id ASC
|
||||
""",
|
||||
(username, username),
|
||||
).fetchall()
|
||||
|
||||
|
||||
def import_user_emails(csv_path: Path, db_path: Path) -> dict[str, object]:
|
||||
rows = _load_rows(csv_path)
|
||||
username_counts = Counter(
|
||||
str(row.get("Username") or "").strip().lower()
|
||||
for row in rows
|
||||
if str(row.get("Username") or "").strip()
|
||||
)
|
||||
duplicate_usernames = {
|
||||
username for username, count in username_counts.items() if username and count > 1
|
||||
}
|
||||
|
||||
summary: dict[str, object] = {
|
||||
"csv_path": str(csv_path),
|
||||
"db_path": str(db_path),
|
||||
"source_rows": len(rows),
|
||||
"updated": 0,
|
||||
"unchanged": 0,
|
||||
"missing_email": [],
|
||||
"missing_user": [],
|
||||
"duplicate_source_username": [],
|
||||
}
|
||||
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
_ensure_email_column(conn)
|
||||
|
||||
for row in rows:
|
||||
username = str(row.get("Username") or "").strip()
|
||||
if not username:
|
||||
continue
|
||||
username_key = username.lower()
|
||||
if username_key in duplicate_usernames:
|
||||
cast_list = summary["duplicate_source_username"]
|
||||
assert isinstance(cast_list, list)
|
||||
if username not in cast_list:
|
||||
cast_list.append(username)
|
||||
continue
|
||||
|
||||
email = _normalize_email(row.get("Email"))
|
||||
if not email:
|
||||
cast_list = summary["missing_email"]
|
||||
assert isinstance(cast_list, list)
|
||||
cast_list.append(username)
|
||||
continue
|
||||
|
||||
matches = _lookup_user(conn, username)
|
||||
if not matches:
|
||||
cast_list = summary["missing_user"]
|
||||
assert isinstance(cast_list, list)
|
||||
cast_list.append(username)
|
||||
continue
|
||||
|
||||
current_emails = {
|
||||
normalized.lower()
|
||||
for normalized in (_normalize_email(row["email"]) for row in matches)
|
||||
if normalized
|
||||
}
|
||||
if current_emails == {email.lower()}:
|
||||
summary["unchanged"] = int(summary["unchanged"]) + 1
|
||||
continue
|
||||
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE users
|
||||
SET email = ?
|
||||
WHERE username = ? COLLATE NOCASE
|
||||
""",
|
||||
(email, username),
|
||||
)
|
||||
summary["updated"] = int(summary["updated"]) + 1
|
||||
|
||||
summary["missing_email_count"] = len(summary["missing_email"]) # type: ignore[arg-type]
|
||||
summary["missing_user_count"] = len(summary["missing_user"]) # type: ignore[arg-type]
|
||||
summary["duplicate_source_username_count"] = len(summary["duplicate_source_username"]) # type: ignore[arg-type]
|
||||
return summary
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Import user email addresses into Magent users.")
|
||||
parser.add_argument(
|
||||
"csv_path",
|
||||
nargs="?",
|
||||
default=str(DEFAULT_CSV_PATH),
|
||||
help="CSV file containing Username and Email columns",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--db-path",
|
||||
default=str(DEFAULT_DB_PATH),
|
||||
help="Path to the Magent SQLite database",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
summary = import_user_emails(Path(args.csv_path), Path(args.db_path))
|
||||
print(json.dumps(summary, indent=2, sort_keys=True))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user