from __future__ import annotations import asyncio import json import time from datetime import datetime, timezone from typing import Any, Dict, Optional from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import StreamingResponse from ..auth import get_current_user_event_stream from . import requests as requests_router from .status import services_status router = APIRouter(prefix="/events", tags=["events"]) def _sse_json(payload: Dict[str, Any]) -> str: return f"data: {json.dumps(payload, ensure_ascii=True, separators=(',', ':'), default=str)}\n\n" def _jsonable(value: Any) -> Any: if hasattr(value, "model_dump"): try: return value.model_dump(mode="json") except TypeError: return value.model_dump() if hasattr(value, "dict"): try: return value.dict() except TypeError: return value return value def _request_history_brief(entries: Any) -> list[dict[str, Any]]: if not isinstance(entries, list): return [] items: list[dict[str, Any]] = [] for entry in entries: if not isinstance(entry, dict): continue items.append( { "request_id": entry.get("request_id"), "state": entry.get("state"), "state_reason": entry.get("state_reason"), "created_at": entry.get("created_at"), } ) return items def _request_actions_brief(entries: Any) -> list[dict[str, Any]]: if not isinstance(entries, list): return [] items: list[dict[str, Any]] = [] for entry in entries: if not isinstance(entry, dict): continue items.append( { "request_id": entry.get("request_id"), "action_id": entry.get("action_id"), "label": entry.get("label"), "status": entry.get("status"), "message": entry.get("message"), "created_at": entry.get("created_at"), } ) return items @router.get("/stream") async def events_stream( request: Request, recent_days: int = 90, user: Dict[str, Any] = Depends(get_current_user_event_stream), ) -> StreamingResponse: recent_days = max(0, min(int(recent_days or 90), 3650)) recent_take = 50 if user.get("role") == "admin" else 6 async def event_generator(): yield "retry: 2000\n\n" last_recent_signature: Optional[str] = None last_services_signature: Optional[str] = None next_recent_at = 0.0 next_services_at = 0.0 heartbeat_counter = 0 while True: if await request.is_disconnected(): break now = time.monotonic() sent_any = False if now >= next_recent_at: next_recent_at = now + 15.0 try: recent_payload = await requests_router.recent_requests( take=recent_take, skip=0, days=recent_days, user=user, ) results = recent_payload.get("results") if isinstance(recent_payload, dict) else [] payload = { "type": "home_recent", "ts": datetime.now(timezone.utc).isoformat(), "days": recent_days, "results": results if isinstance(results, list) else [], } except Exception as exc: payload = { "type": "home_recent", "ts": datetime.now(timezone.utc).isoformat(), "days": recent_days, "error": str(exc), } signature = json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str) if signature != last_recent_signature: last_recent_signature = signature yield _sse_json(payload) sent_any = True if now >= next_services_at: next_services_at = now + 30.0 try: status_payload = await services_status() payload = { "type": "home_services", "ts": datetime.now(timezone.utc).isoformat(), "status": status_payload, } except Exception as exc: payload = { "type": "home_services", "ts": datetime.now(timezone.utc).isoformat(), "error": str(exc), } signature = json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str) if signature != last_services_signature: last_services_signature = signature yield _sse_json(payload) sent_any = True if sent_any: heartbeat_counter = 0 else: heartbeat_counter += 1 if heartbeat_counter >= 15: yield ": ping\n\n" heartbeat_counter = 0 await asyncio.sleep(1.0) headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers) @router.get("/requests/{request_id}/stream") async def request_events_stream( request_id: str, request: Request, user: Dict[str, Any] = Depends(get_current_user_event_stream), ) -> StreamingResponse: request_id = str(request_id).strip() if not request_id: raise HTTPException(status_code=400, detail="Missing request id") async def event_generator(): yield "retry: 2000\n\n" last_signature: Optional[str] = None next_refresh_at = 0.0 heartbeat_counter = 0 while True: if await request.is_disconnected(): break now = time.monotonic() sent_any = False if now >= next_refresh_at: next_refresh_at = now + 2.0 try: snapshot = await requests_router.get_snapshot(request_id=request_id, user=user) history_payload = await requests_router.request_history( request_id=request_id, limit=5, user=user ) actions_payload = await requests_router.request_actions( request_id=request_id, limit=5, user=user ) payload = { "type": "request_live", "request_id": request_id, "ts": datetime.now(timezone.utc).isoformat(), "snapshot": _jsonable(snapshot), "history": _request_history_brief( history_payload.get("snapshots", []) if isinstance(history_payload, dict) else [] ), "actions": _request_actions_brief( actions_payload.get("actions", []) if isinstance(actions_payload, dict) else [] ), } except HTTPException as exc: payload = { "type": "request_live", "request_id": request_id, "ts": datetime.now(timezone.utc).isoformat(), "error": str(exc.detail), "status_code": int(exc.status_code), } except Exception as exc: payload = { "type": "request_live", "request_id": request_id, "ts": datetime.now(timezone.utc).isoformat(), "error": str(exc), } signature = json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str) if signature != last_signature: last_signature = signature yield _sse_json(payload) sent_any = True if sent_any: heartbeat_counter = 0 else: heartbeat_counter += 1 if heartbeat_counter >= 15: yield ": ping\n\n" heartbeat_counter = 0 await asyncio.sleep(1.0) headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)