fix(auth): block local login bypass for external accounts

This commit is contained in:
2026-02-26 20:54:54 +13:00
parent 9be0ec75ec
commit 4685f813c9
2 changed files with 91 additions and 5 deletions

View File

@@ -581,12 +581,80 @@ def set_auto_search_enabled_for_non_admin_users(enabled: bool) -> int:
def verify_user_password(username: str, password: str) -> Optional[Dict[str, Any]]: def verify_user_password(username: str, password: str) -> Optional[Dict[str, Any]]:
user = get_user_by_username(username) # Resolve case-insensitive duplicates safely by only considering local-provider rows.
if not user: with _connect() as conn:
rows = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE username = ? COLLATE NOCASE
ORDER BY
CASE WHEN username = ? THEN 0 ELSE 1 END,
id ASC
""",
(username, username),
).fetchall()
if not rows:
return None return None
if not verify_password(password, user["password_hash"]): for row in rows:
return None provider = str(row[4] or "local").lower()
return user if provider != "local":
continue
if not verify_password(password, row[2]):
continue
return {
"id": row[0],
"username": row[1],
"password_hash": row[2],
"role": row[3],
"auth_provider": row[4],
"jellyseerr_user_id": row[5],
"created_at": row[6],
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"jellyfin_password_hash": row[10],
"last_jellyfin_auth_at": row[11],
}
return None
def get_users_by_username_ci(username: str) -> list[Dict[str, Any]]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, username, password_hash, role, auth_provider, jellyseerr_user_id,
created_at, last_login_at, is_blocked, auto_search_enabled,
jellyfin_password_hash, last_jellyfin_auth_at
FROM users
WHERE username = ? COLLATE NOCASE
ORDER BY
CASE WHEN username = ? THEN 0 ELSE 1 END,
id ASC
""",
(username, username),
).fetchall()
results: list[Dict[str, Any]] = []
for row in rows:
results.append(
{
"id": row[0],
"username": row[1],
"password_hash": row[2],
"role": row[3],
"auth_provider": row[4],
"jellyseerr_user_id": row[5],
"created_at": row[6],
"last_login_at": row[7],
"is_blocked": bool(row[8]),
"auto_search_enabled": bool(row[9]),
"jellyfin_password_hash": row[10],
"last_jellyfin_auth_at": row[11],
}
)
return results
def set_user_password(username: str, password: str) -> None: def set_user_password(username: str, password: str) -> None:

View File

@@ -8,6 +8,7 @@ from ..db import (
create_user_if_missing, create_user_if_missing,
set_last_login, set_last_login,
get_user_by_username, get_user_by_username,
get_users_by_username_ci,
set_user_password, set_user_password,
set_jellyfin_auth_cache, set_jellyfin_auth_cache,
set_user_jellyseerr_id, set_user_jellyseerr_id,
@@ -82,9 +83,26 @@ def _extract_jellyseerr_user_id(response: dict) -> int | None:
@router.post("/login") @router.post("/login")
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict: async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> dict:
# Provider placeholder passwords must never be accepted by the local-login endpoint.
if form_data.password in {"jellyfin-user", "jellyseerr-user"}:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
matching_users = get_users_by_username_ci(form_data.username)
has_external_match = any(
str(user.get("auth_provider") or "local").lower() != "local" for user in matching_users
)
if has_external_match:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This account uses external sign-in. Use the external sign-in option.",
)
user = verify_user_password(form_data.username, form_data.password) user = verify_user_password(form_data.username, form_data.password)
if not user: if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
if str(user.get("auth_provider") or "local").lower() != "local":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This account uses external sign-in. Use the external sign-in option.",
)
if user.get("is_blocked"): if user.get("is_blocked"):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is blocked") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is blocked")
token = create_access_token(user["username"], user["role"]) token = create_access_token(user["username"], user["role"])