Build 2602261636: self-service invites and count fixes

This commit is contained in:
2026-02-26 16:37:58 +13:00
parent 1b1a3e233b
commit 23c57da3cc
6 changed files with 736 additions and 9 deletions

View File

@@ -1,4 +1,6 @@
from datetime import datetime, timedelta, timezone
import secrets
import string
import httpx
from fastapi import APIRouter, HTTPException, status, Depends
@@ -14,6 +16,11 @@ from ..db import (
set_jellyfin_auth_cache,
set_user_jellyseerr_id,
get_signup_invite_by_code,
get_signup_invite_by_id,
list_signup_invites,
create_signup_invite,
update_signup_invite,
delete_signup_invite,
increment_signup_invite_use,
get_user_profile,
get_user_activity,
@@ -156,6 +163,118 @@ def _public_invite_payload(invite: dict, profile: dict | None = None) -> dict:
}
def _parse_optional_positive_int(value: object, field_name: str) -> int | None:
if value is None or value == "":
return None
try:
parsed = int(value)
except (TypeError, ValueError) as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"{field_name} must be a number") from exc
if parsed <= 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"{field_name} must be greater than 0",
)
return parsed
def _parse_optional_expires_at(value: object) -> str | None:
if value is None or value == "":
return None
if not isinstance(value, str):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="expires_at must be an ISO datetime string",
)
candidate = value.strip()
if not candidate:
return None
try:
parsed = datetime.fromisoformat(candidate.replace("Z", "+00:00"))
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="expires_at must be a valid ISO datetime",
) from exc
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.isoformat()
def _normalize_invite_code(value: str | None) -> str:
raw = (value or "").strip().upper()
filtered = "".join(ch for ch in raw if ch.isalnum())
if len(filtered) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invite code must be at least 6 letters/numbers.",
)
return filtered
def _generate_invite_code(length: int = 12) -> str:
alphabet = string.ascii_uppercase + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
def _same_username(a: object, b: object) -> bool:
if not isinstance(a, str) or not isinstance(b, str):
return False
return a.strip().lower() == b.strip().lower()
def _serialize_self_invite(invite: dict) -> dict:
profile = None
profile_id = invite.get("profile_id")
if profile_id is not None:
try:
profile = get_user_profile(int(profile_id))
except Exception:
profile = None
return {
"id": invite.get("id"),
"code": invite.get("code"),
"label": invite.get("label"),
"description": invite.get("description"),
"profile_id": invite.get("profile_id"),
"profile": (
{"id": profile.get("id"), "name": profile.get("name")}
if isinstance(profile, dict)
else None
),
"role": invite.get("role"),
"max_uses": invite.get("max_uses"),
"use_count": invite.get("use_count", 0),
"remaining_uses": invite.get("remaining_uses"),
"enabled": bool(invite.get("enabled")),
"expires_at": invite.get("expires_at"),
"is_expired": bool(invite.get("is_expired")),
"is_usable": bool(invite.get("is_usable")),
"created_at": invite.get("created_at"),
"updated_at": invite.get("updated_at"),
"created_by": invite.get("created_by"),
}
def _current_user_invites(username: str) -> list[dict]:
owned = [
invite
for invite in list_signup_invites()
if _same_username(invite.get("created_by"), username)
]
owned.sort(key=lambda item: (str(item.get("created_at") or ""), int(item.get("id") or 0)), reverse=True)
return owned
def _get_owned_invite(invite_id: int, current_user: dict) -> dict:
invite = get_signup_invite_by_id(invite_id)
if not invite:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invite not found")
if not _same_username(invite.get("created_by"), current_user.get("username")):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You can only manage your own invites")
return invite
@router.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict:
user = verify_user_password(form_data.username, form_data.password)
@@ -444,6 +563,124 @@ async def profile(current_user: dict = Depends(get_current_user)) -> dict:
}
@router.get("/profile/invites")
async def profile_invites(current_user: dict = Depends(get_current_user)) -> dict:
username = str(current_user.get("username") or "").strip()
if not username:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid user")
invites = [_serialize_self_invite(invite) for invite in _current_user_invites(username)]
return {"invites": invites, "count": len(invites)}
@router.post("/profile/invites")
async def create_profile_invite(payload: dict, current_user: dict = Depends(get_current_user)) -> dict:
if not isinstance(payload, dict):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid payload")
username = str(current_user.get("username") or "").strip()
if not username:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid user")
requested_code = payload.get("code")
if isinstance(requested_code, str) and requested_code.strip():
code = _normalize_invite_code(requested_code)
existing = get_signup_invite_by_code(code)
if existing:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Invite code already exists")
else:
code = ""
for _ in range(20):
candidate = _generate_invite_code()
if not get_signup_invite_by_code(candidate):
code = candidate
break
if not code:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not generate invite code")
label = payload.get("label")
description = payload.get("description")
if label is not None:
label = str(label).strip() or None
if description is not None:
description = str(description).strip() or None
max_uses = _parse_optional_positive_int(payload.get("max_uses"), "max_uses")
expires_at = _parse_optional_expires_at(payload.get("expires_at"))
enabled = bool(payload.get("enabled", True))
profile_id = current_user.get("profile_id")
if not isinstance(profile_id, int) or profile_id <= 0:
profile_id = None
invite = create_signup_invite(
code=code,
label=label,
description=description,
profile_id=profile_id,
role="user",
max_uses=max_uses,
enabled=enabled,
expires_at=expires_at,
created_by=username,
)
return {"status": "ok", "invite": _serialize_self_invite(invite)}
@router.put("/profile/invites/{invite_id}")
async def update_profile_invite(
invite_id: int, payload: dict, current_user: dict = Depends(get_current_user)
) -> dict:
if not isinstance(payload, dict):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid payload")
existing = _get_owned_invite(invite_id, current_user)
requested_code = payload.get("code", existing.get("code"))
if isinstance(requested_code, str) and requested_code.strip():
code = _normalize_invite_code(requested_code)
else:
code = str(existing.get("code") or "").strip()
if not code:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invite code is required")
duplicate = get_signup_invite_by_code(code)
if duplicate and int(duplicate.get("id") or 0) != int(existing.get("id") or 0):
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Invite code already exists")
label = payload.get("label", existing.get("label"))
description = payload.get("description", existing.get("description"))
if label is not None:
label = str(label).strip() or None
if description is not None:
description = str(description).strip() or None
max_uses = _parse_optional_positive_int(payload.get("max_uses", existing.get("max_uses")), "max_uses")
expires_at = _parse_optional_expires_at(payload.get("expires_at", existing.get("expires_at")))
enabled_raw = payload.get("enabled", existing.get("enabled"))
enabled = bool(enabled_raw)
invite = update_signup_invite(
invite_id,
code=code,
label=label,
description=description,
profile_id=existing.get("profile_id"),
role=existing.get("role"),
max_uses=max_uses,
enabled=enabled,
expires_at=expires_at,
)
if not invite:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invite not found")
return {"status": "ok", "invite": _serialize_self_invite(invite)}
@router.delete("/profile/invites/{invite_id}")
async def delete_profile_invite(invite_id: int, current_user: dict = Depends(get_current_user)) -> dict:
_get_owned_invite(invite_id, current_user)
deleted = delete_signup_invite(invite_id)
if not deleted:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invite not found")
return {"status": "ok"}
@router.post("/password")
async def change_password(payload: dict, current_user: dict = Depends(get_current_user)) -> dict:
if current_user.get("auth_provider") != "local":