Build 2602262030: add magent settings and hardening

This commit is contained in:
2026-02-26 20:31:26 +13:00
parent b215e8030c
commit 0b73d9f4ee
16 changed files with 897 additions and 140 deletions

View File

@@ -38,11 +38,18 @@ def _extract_client_ip(request: Request) -> str:
return "unknown"
def _load_current_user_from_token(token: str, request: Optional[Request] = None) -> Dict[str, Any]:
def _load_current_user_from_token(
token: str,
request: Optional[Request] = None,
allowed_token_types: Optional[set[str]] = None,
) -> Dict[str, Any]:
try:
payload = safe_decode_token(token)
except TokenError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
token_type = str(payload.get("typ") or "access").strip().lower()
if allowed_token_types and token_type not in allowed_token_types:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
username = payload.get("sub")
if not username:
@@ -79,16 +86,24 @@ def get_current_user(token: str = Depends(oauth2_scheme), request: Request = Non
def get_current_user_event_stream(request: Request) -> Dict[str, Any]:
"""EventSource cannot send Authorization headers, so allow a query token here only."""
"""EventSource cannot send Authorization headers, so allow a short-lived stream token via query."""
token = None
stream_query_token = None
auth_header = request.headers.get("authorization", "")
if auth_header.lower().startswith("bearer "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
token = request.query_params.get("access_token")
if not token:
stream_query_token = request.query_params.get("stream_token")
if not token and not stream_query_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing token")
return _load_current_user_from_token(token, None)
if token:
# Allow standard bearer tokens in Authorization for non-browser EventSource clients.
return _load_current_user_from_token(token, None)
return _load_current_user_from_token(
str(stream_query_token),
None,
allowed_token_types={"sse"},
)
def require_admin(user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]: