Build 2602262241: live request page updates
This commit is contained in:
@@ -6,7 +6,7 @@ import time
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, Request
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from ..auth import get_current_user_event_stream
|
||||
@@ -20,6 +20,58 @@ def _sse_json(payload: Dict[str, Any]) -> str:
|
||||
return f"data: {json.dumps(payload, ensure_ascii=True, separators=(',', ':'), default=str)}\n\n"
|
||||
|
||||
|
||||
def _jsonable(value: Any) -> Any:
|
||||
if hasattr(value, "model_dump"):
|
||||
try:
|
||||
return value.model_dump(mode="json")
|
||||
except TypeError:
|
||||
return value.model_dump()
|
||||
if hasattr(value, "dict"):
|
||||
try:
|
||||
return value.dict()
|
||||
except TypeError:
|
||||
return value
|
||||
return value
|
||||
|
||||
|
||||
def _request_history_brief(entries: Any) -> list[dict[str, Any]]:
|
||||
if not isinstance(entries, list):
|
||||
return []
|
||||
items: list[dict[str, Any]] = []
|
||||
for entry in entries:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
items.append(
|
||||
{
|
||||
"request_id": entry.get("request_id"),
|
||||
"state": entry.get("state"),
|
||||
"state_reason": entry.get("state_reason"),
|
||||
"created_at": entry.get("created_at"),
|
||||
}
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def _request_actions_brief(entries: Any) -> list[dict[str, Any]]:
|
||||
if not isinstance(entries, list):
|
||||
return []
|
||||
items: list[dict[str, Any]] = []
|
||||
for entry in entries:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
items.append(
|
||||
{
|
||||
"request_id": entry.get("request_id"),
|
||||
"action_id": entry.get("action_id"),
|
||||
"label": entry.get("label"),
|
||||
"status": entry.get("status"),
|
||||
"message": entry.get("message"),
|
||||
"created_at": entry.get("created_at"),
|
||||
}
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
@router.get("/stream")
|
||||
async def events_stream(
|
||||
request: Request,
|
||||
@@ -110,3 +162,88 @@ async def events_stream(
|
||||
"X-Accel-Buffering": "no",
|
||||
}
|
||||
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
|
||||
|
||||
|
||||
@router.get("/requests/{request_id}/stream")
|
||||
async def request_events_stream(
|
||||
request_id: str,
|
||||
request: Request,
|
||||
user: Dict[str, Any] = Depends(get_current_user_event_stream),
|
||||
) -> StreamingResponse:
|
||||
request_id = str(request_id).strip()
|
||||
if not request_id:
|
||||
raise HTTPException(status_code=400, detail="Missing request id")
|
||||
|
||||
async def event_generator():
|
||||
yield "retry: 2000\n\n"
|
||||
last_signature: Optional[str] = None
|
||||
next_refresh_at = 0.0
|
||||
heartbeat_counter = 0
|
||||
|
||||
while True:
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
|
||||
now = time.monotonic()
|
||||
sent_any = False
|
||||
|
||||
if now >= next_refresh_at:
|
||||
next_refresh_at = now + 2.0
|
||||
try:
|
||||
snapshot = await requests_router.get_snapshot(request_id=request_id, user=user)
|
||||
history_payload = await requests_router.request_history(
|
||||
request_id=request_id, limit=5, user=user
|
||||
)
|
||||
actions_payload = await requests_router.request_actions(
|
||||
request_id=request_id, limit=5, user=user
|
||||
)
|
||||
payload = {
|
||||
"type": "request_live",
|
||||
"request_id": request_id,
|
||||
"ts": datetime.now(timezone.utc).isoformat(),
|
||||
"snapshot": _jsonable(snapshot),
|
||||
"history": _request_history_brief(
|
||||
history_payload.get("snapshots", []) if isinstance(history_payload, dict) else []
|
||||
),
|
||||
"actions": _request_actions_brief(
|
||||
actions_payload.get("actions", []) if isinstance(actions_payload, dict) else []
|
||||
),
|
||||
}
|
||||
except HTTPException as exc:
|
||||
payload = {
|
||||
"type": "request_live",
|
||||
"request_id": request_id,
|
||||
"ts": datetime.now(timezone.utc).isoformat(),
|
||||
"error": str(exc.detail),
|
||||
"status_code": int(exc.status_code),
|
||||
}
|
||||
except Exception as exc:
|
||||
payload = {
|
||||
"type": "request_live",
|
||||
"request_id": request_id,
|
||||
"ts": datetime.now(timezone.utc).isoformat(),
|
||||
"error": str(exc),
|
||||
}
|
||||
|
||||
signature = json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str)
|
||||
if signature != last_signature:
|
||||
last_signature = signature
|
||||
yield _sse_json(payload)
|
||||
sent_any = True
|
||||
|
||||
if sent_any:
|
||||
heartbeat_counter = 0
|
||||
else:
|
||||
heartbeat_counter += 1
|
||||
if heartbeat_counter >= 15:
|
||||
yield ": ping\n\n"
|
||||
heartbeat_counter = 0
|
||||
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
headers = {
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
}
|
||||
return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
|
||||
|
||||
Reference in New Issue
Block a user