Build 2602260214: invites profiles and expiry admin controls

This commit is contained in:
2026-02-26 02:15:21 +13:00
parent 9be0ec75ec
commit f78382c019
14 changed files with 2795 additions and 31 deletions

View File

@@ -24,6 +24,28 @@ def _connect() -> sqlite3.Connection:
return sqlite3.connect(_db_path())
def _parse_datetime_value(value: Optional[str]) -> Optional[datetime]:
if not isinstance(value, str) or not value.strip():
return None
candidate = value.strip()
if candidate.endswith("Z"):
candidate = candidate[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(candidate)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
def _is_datetime_in_past(value: Optional[str]) -> bool:
parsed = _parse_datetime_value(value)
if parsed is None:
return False
return parsed <= datetime.now(timezone.utc)
def _normalize_title_value(title: Optional[str]) -> Optional[str]:
if not isinstance(title, str):
return None
@@ -150,11 +172,61 @@ def init_db() -> None:
last_login_at TEXT,
is_blocked INTEGER NOT NULL DEFAULT 0,
auto_search_enabled INTEGER NOT NULL DEFAULT 1,
profile_id INTEGER,
expires_at TEXT,
invited_by_code TEXT,
invited_at TEXT,
jellyfin_password_hash TEXT,
last_jellyfin_auth_at TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS user_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
description TEXT,
role TEXT NOT NULL DEFAULT 'user',
auto_search_enabled INTEGER NOT NULL DEFAULT 1,
account_expires_days INTEGER,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS signup_invites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL UNIQUE,
label TEXT,
description TEXT,
profile_id INTEGER,
role TEXT,
max_uses INTEGER,
use_count INTEGER NOT NULL DEFAULT 0,
enabled INTEGER NOT NULL DEFAULT 1,
expires_at TEXT,
created_by TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_signup_invites_enabled
ON signup_invites (enabled)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_signup_invites_expires_at
ON signup_invites (expires_at)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS settings (
@@ -269,6 +341,40 @@ def init_db() -> None:
conn.execute("ALTER TABLE users ADD COLUMN auto_search_enabled INTEGER NOT NULL DEFAULT 1")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN profile_id INTEGER")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN expires_at TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN invited_by_code TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE users ADD COLUMN invited_at TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_profile_id
ON users (profile_id)
"""
)
except sqlite3.OperationalError:
pass
try:
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_users_expires_at
ON users (expires_at)
"""
)
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE requests_cache ADD COLUMN requested_by_id INTEGER")
except sqlite3.OperationalError:
@@ -391,16 +497,44 @@ def create_user(
role: str = "user",
auth_provider: str = "local",
jellyseerr_user_id: Optional[int] = None,
auto_search_enabled: bool = True,
profile_id: Optional[int] = None,
expires_at: Optional[str] = None,
invited_by_code: Optional[str] = None,
) -> None:
created_at = datetime.now(timezone.utc).isoformat()
password_hash = hash_password(password)
with _connect() as conn:
conn.execute(
"""
INSERT INTO users (username, password_hash, role, auth_provider, jellyseerr_user_id, created_at)
VALUES (?, ?, ?, ?, ?, ?)
INSERT INTO users (
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
auto_search_enabled,
profile_id,
expires_at,
invited_by_code,
invited_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(username, password_hash, role, auth_provider, jellyseerr_user_id, created_at),
(
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
1 if auto_search_enabled else 0,
profile_id,
expires_at,
invited_by_code,
created_at if invited_by_code else None,
),
)
@@ -410,16 +544,44 @@ def create_user_if_missing(
role: str = "user",
auth_provider: str = "local",
jellyseerr_user_id: Optional[int] = None,
auto_search_enabled: bool = True,
profile_id: Optional[int] = None,
expires_at: Optional[str] = None,
invited_by_code: Optional[str] = None,
) -> bool:
created_at = datetime.now(timezone.utc).isoformat()
password_hash = hash_password(password)
with _connect() as conn:
cursor = conn.execute(
"""
INSERT OR IGNORE INTO users (username, password_hash, role, auth_provider, jellyseerr_user_id, created_at)
VALUES (?, ?, ?, ?, ?, ?)
INSERT OR IGNORE INTO users (
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
auto_search_enabled,
profile_id,
expires_at,
invited_by_code,
invited_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(username, password_hash, role, auth_provider, jellyseerr_user_id, created_at),
(
username,
password_hash,
role,
auth_provider,
jellyseerr_user_id,
created_at,
1 if auto_search_enabled else 0,
profile_id,
expires_at,
invited_by_code,
created_at if invited_by_code else None,
),
)
return cursor.rowcount > 0
@@ -429,7 +591,9 @@ def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
row = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled, jellyfin_password_hash, last_jellyfin_auth_at
created_at, last_login_at, is_blocked, auto_search_enabled,
profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE username = ? COLLATE NOCASE
""",
@@ -448,8 +612,13 @@ def get_user_by_username(username: str) -> Optional[Dict[str, Any]]:
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"jellyfin_password_hash": row[10],
"last_jellyfin_auth_at": row[11],
"profile_id": row[10],
"expires_at": row[11],
"invited_by_code": row[12],
"invited_at": row[13],
"is_expired": _is_datetime_in_past(row[11]),
"jellyfin_password_hash": row[14],
"last_jellyfin_auth_at": row[15],
}
@@ -458,7 +627,9 @@ def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
row = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled, jellyfin_password_hash, last_jellyfin_auth_at
created_at, last_login_at, is_blocked, auto_search_enabled,
profile_id, expires_at, invited_by_code, invited_at,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE id = ?
""",
@@ -477,15 +648,22 @@ def get_user_by_id(user_id: int) -> Optional[Dict[str, Any]]:
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"jellyfin_password_hash": row[10],
"last_jellyfin_auth_at": row[11],
"profile_id": row[10],
"expires_at": row[11],
"invited_by_code": row[12],
"invited_at": row[13],
"is_expired": _is_datetime_in_past(row[11]),
"jellyfin_password_hash": row[14],
"last_jellyfin_auth_at": row[15],
}
def get_all_users() -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, username, role, auth_provider, jellyseerr_user_id, created_at, last_login_at, is_blocked, auto_search_enabled
SELECT id, username, role, auth_provider, jellyseerr_user_id, created_at,
last_login_at, is_blocked, auto_search_enabled, profile_id, expires_at,
invited_by_code, invited_at
FROM users
ORDER BY username COLLATE NOCASE
"""
@@ -503,6 +681,11 @@ def get_all_users() -> list[Dict[str, Any]]:
"last_login_at": row[6],
"is_blocked": bool(row[7]),
"auto_search_enabled": bool(row[8]),
"profile_id": row[9],
"expires_at": row[10],
"invited_by_code": row[11],
"invited_at": row[12],
"is_expired": _is_datetime_in_past(row[10]),
}
)
return results
@@ -580,6 +763,333 @@ def set_auto_search_enabled_for_non_admin_users(enabled: bool) -> int:
return cursor.rowcount
def set_user_profile_id(username: str, profile_id: Optional[int]) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET profile_id = ? WHERE username = ? COLLATE NOCASE
""",
(profile_id, username),
)
def set_user_expires_at(username: str, expires_at: Optional[str]) -> None:
with _connect() as conn:
conn.execute(
"""
UPDATE users SET expires_at = ? WHERE username = ? COLLATE NOCASE
""",
(expires_at, username),
)
def _row_to_user_profile(row: Any) -> Dict[str, Any]:
return {
"id": row[0],
"name": row[1],
"description": row[2],
"role": row[3],
"auto_search_enabled": bool(row[4]),
"account_expires_days": row[5],
"is_active": bool(row[6]),
"created_at": row[7],
"updated_at": row[8],
}
def list_user_profiles() -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at
FROM user_profiles
ORDER BY name COLLATE NOCASE
"""
).fetchall()
return [_row_to_user_profile(row) for row in rows]
def get_user_profile(profile_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at
FROM user_profiles
WHERE id = ?
""",
(profile_id,),
).fetchone()
if not row:
return None
return _row_to_user_profile(row)
def create_user_profile(
name: str,
description: Optional[str] = None,
role: str = "user",
auto_search_enabled: bool = True,
account_expires_days: Optional[int] = None,
is_active: bool = True,
) -> Dict[str, Any]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO user_profiles (
name, description, role, auto_search_enabled, account_expires_days, is_active, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
name,
description,
role,
1 if auto_search_enabled else 0,
account_expires_days,
1 if is_active else 0,
timestamp,
timestamp,
),
)
profile_id = int(cursor.lastrowid)
profile = get_user_profile(profile_id)
if not profile:
raise RuntimeError("Profile creation failed")
return profile
def update_user_profile(
profile_id: int,
*,
name: str,
description: Optional[str],
role: str,
auto_search_enabled: bool,
account_expires_days: Optional[int],
is_active: bool,
) -> Optional[Dict[str, Any]]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE user_profiles
SET name = ?, description = ?, role = ?, auto_search_enabled = ?,
account_expires_days = ?, is_active = ?, updated_at = ?
WHERE id = ?
""",
(
name,
description,
role,
1 if auto_search_enabled else 0,
account_expires_days,
1 if is_active else 0,
timestamp,
profile_id,
),
)
if cursor.rowcount <= 0:
return None
return get_user_profile(profile_id)
def delete_user_profile(profile_id: int) -> bool:
with _connect() as conn:
users_count = conn.execute(
"SELECT COUNT(*) FROM users WHERE profile_id = ?",
(profile_id,),
).fetchone()
invites_count = conn.execute(
"SELECT COUNT(*) FROM signup_invites WHERE profile_id = ?",
(profile_id,),
).fetchone()
if int((users_count or [0])[0] or 0) > 0:
raise ValueError("Profile is assigned to existing users.")
if int((invites_count or [0])[0] or 0) > 0:
raise ValueError("Profile is assigned to existing invites.")
cursor = conn.execute(
"DELETE FROM user_profiles WHERE id = ?",
(profile_id,),
)
return cursor.rowcount > 0
def _row_to_signup_invite(row: Any) -> Dict[str, Any]:
max_uses = row[6]
use_count = int(row[7] or 0)
expires_at = row[9]
is_expired = _is_datetime_in_past(expires_at)
remaining_uses = None if max_uses is None else max(int(max_uses) - use_count, 0)
return {
"id": row[0],
"code": row[1],
"label": row[2],
"description": row[3],
"profile_id": row[4],
"role": row[5],
"max_uses": max_uses,
"use_count": use_count,
"enabled": bool(row[8]),
"expires_at": expires_at,
"created_by": row[10],
"created_at": row[11],
"updated_at": row[12],
"is_expired": is_expired,
"remaining_uses": remaining_uses,
"is_usable": bool(row[8]) and not is_expired and (remaining_uses is None or remaining_uses > 0),
}
def list_signup_invites() -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
FROM signup_invites
ORDER BY created_at DESC, id DESC
"""
).fetchall()
return [_row_to_signup_invite(row) for row in rows]
def get_signup_invite_by_id(invite_id: int) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
FROM signup_invites
WHERE id = ?
""",
(invite_id,),
).fetchone()
if not row:
return None
return _row_to_signup_invite(row)
def get_signup_invite_by_code(code: str) -> Optional[Dict[str, Any]]:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
FROM signup_invites
WHERE code = ? COLLATE NOCASE
""",
(code,),
).fetchone()
if not row:
return None
return _row_to_signup_invite(row)
def create_signup_invite(
*,
code: str,
label: Optional[str] = None,
description: Optional[str] = None,
profile_id: Optional[int] = None,
role: Optional[str] = None,
max_uses: Optional[int] = None,
enabled: bool = True,
expires_at: Optional[str] = None,
created_by: Optional[str] = None,
) -> Dict[str, Any]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO signup_invites (
code, label, description, profile_id, role, max_uses, use_count, enabled,
expires_at, created_by, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?, ?)
""",
(
code,
label,
description,
profile_id,
role,
max_uses,
1 if enabled else 0,
expires_at,
created_by,
timestamp,
timestamp,
),
)
invite_id = int(cursor.lastrowid)
invite = get_signup_invite_by_id(invite_id)
if not invite:
raise RuntimeError("Invite creation failed")
return invite
def update_signup_invite(
invite_id: int,
*,
code: str,
label: Optional[str],
description: Optional[str],
profile_id: Optional[int],
role: Optional[str],
max_uses: Optional[int],
enabled: bool,
expires_at: Optional[str],
) -> Optional[Dict[str, Any]]:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cursor = conn.execute(
"""
UPDATE signup_invites
SET code = ?, label = ?, description = ?, profile_id = ?, role = ?, max_uses = ?,
enabled = ?, expires_at = ?, updated_at = ?
WHERE id = ?
""",
(
code,
label,
description,
profile_id,
role,
max_uses,
1 if enabled else 0,
expires_at,
timestamp,
invite_id,
),
)
if cursor.rowcount <= 0:
return None
return get_signup_invite_by_id(invite_id)
def delete_signup_invite(invite_id: int) -> bool:
with _connect() as conn:
cursor = conn.execute(
"DELETE FROM signup_invites WHERE id = ?",
(invite_id,),
)
return cursor.rowcount > 0
def increment_signup_invite_use(invite_id: int) -> None:
timestamp = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
UPDATE signup_invites
SET use_count = use_count + 1, updated_at = ?
WHERE id = ?
""",
(timestamp, invite_id),
)
def verify_user_password(username: str, password: str) -> Optional[Dict[str, Any]]:
user = get_user_by_username(username)
if not user: