diff --git a/backend/app/db.py b/backend/app/db.py index ede28db..98838fd 100644 --- a/backend/app/db.py +++ b/backend/app/db.py @@ -581,12 +581,80 @@ def set_auto_search_enabled_for_non_admin_users(enabled: bool) -> int: def verify_user_password(username: str, password: str) -> Optional[Dict[str, Any]]: - user = get_user_by_username(username) - if not user: + # Resolve case-insensitive duplicates safely by only considering local-provider rows. + with _connect() as conn: + rows = conn.execute( + """ + SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, + created_at, last_login_at, is_blocked, auto_search_enabled, + jellyfin_password_hash, last_jellyfin_auth_at + FROM users + WHERE username = ? COLLATE NOCASE + ORDER BY + CASE WHEN username = ? THEN 0 ELSE 1 END, + id ASC + """, + (username, username), + ).fetchall() + if not rows: return None - if not verify_password(password, user["password_hash"]): - return None - return user + for row in rows: + provider = str(row[4] or "local").lower() + if provider != "local": + continue + if not verify_password(password, row[2]): + continue + return { + "id": row[0], + "username": row[1], + "password_hash": row[2], + "role": row[3], + "auth_provider": row[4], + "jellyseerr_user_id": row[5], + "created_at": row[6], + "last_login_at": row[7], + "is_blocked": bool(row[8]), + "auto_search_enabled": bool(row[9]), + "jellyfin_password_hash": row[10], + "last_jellyfin_auth_at": row[11], + } + return None + + +def get_users_by_username_ci(username: str) -> list[Dict[str, Any]]: + with _connect() as conn: + rows = conn.execute( + """ + SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id, + created_at, last_login_at, is_blocked, auto_search_enabled, + jellyfin_password_hash, last_jellyfin_auth_at + FROM users + WHERE username = ? COLLATE NOCASE + ORDER BY + CASE WHEN username = ? THEN 0 ELSE 1 END, + id ASC + """, + (username, username), + ).fetchall() + results: list[Dict[str, Any]] = [] + for row in rows: + results.append( + { + "id": row[0], + "username": row[1], + "password_hash": row[2], + "role": row[3], + "auth_provider": row[4], + "jellyseerr_user_id": row[5], + "created_at": row[6], + "last_login_at": row[7], + "is_blocked": bool(row[8]), + "auto_search_enabled": bool(row[9]), + "jellyfin_password_hash": row[10], + "last_jellyfin_auth_at": row[11], + } + ) + return results def set_user_password(username: str, password: str) -> None: diff --git a/backend/app/routers/auth.py b/backend/app/routers/auth.py index f6fe65e..0a546ad 100644 --- a/backend/app/routers/auth.py +++ b/backend/app/routers/auth.py @@ -8,6 +8,7 @@ from ..db import ( create_user_if_missing, set_last_login, get_user_by_username, + get_users_by_username_ci, set_user_password, set_jellyfin_auth_cache, set_user_jellyseerr_id, @@ -82,9 +83,26 @@ def _extract_jellyseerr_user_id(response: dict) -> int | None: @router.post("/login") async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict: + # Provider placeholder passwords must never be accepted by the local-login endpoint. + if form_data.password in {"jellyfin-user", "jellyseerr-user"}: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") + matching_users = get_users_by_username_ci(form_data.username) + has_external_match = any( + str(user.get("auth_provider") or "local").lower() != "local" for user in matching_users + ) + if has_external_match: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="This account uses external sign-in. Use the external sign-in option.", + ) user = verify_user_password(form_data.username, form_data.password) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") + if str(user.get("auth_provider") or "local").lower() != "local": + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="This account uses external sign-in. Use the external sign-in option.", + ) if user.get("is_blocked"): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is blocked") token = create_access_token(user["username"], user["role"])