from __future__ import annotations import asyncio import json import time from datetime import datetime, timezone from typing import Any, Dict, Optional from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse from ..auth import get_current_user_event_stream from . import requests as requests_router from .status import services_status router = APIRouter(prefix="/events", tags=["events"]) def _sse_json(payload: Dict[str, Any]) -> str: return f"data: {json.dumps(payload, ensure_ascii=True, separators=(',', ':'), default=str)}\n\n" @router.get("/stream") async def events_stream( request: Request, recent_days: int = 90, user: Dict[str, Any] = Depends(get_current_user_event_stream), ) -> StreamingResponse: recent_days = max(0, min(int(recent_days or 90), 3650)) recent_take = 50 if user.get("role") == "admin" else 6 async def event_generator(): yield "retry: 2000\n\n" last_recent_signature: Optional[str] = None last_services_signature: Optional[str] = None next_recent_at = 0.0 next_services_at = 0.0 heartbeat_counter = 0 while True: if await request.is_disconnected(): break now = time.monotonic() sent_any = False if now >= next_recent_at: next_recent_at = now + 15.0 try: recent_payload = await requests_router.recent_requests( take=recent_take, skip=0, days=recent_days, user=user, ) results = recent_payload.get("results") if isinstance(recent_payload, dict) else [] payload = { "type": "home_recent", "ts": datetime.now(timezone.utc).isoformat(), "days": recent_days, "results": results if isinstance(results, list) else [], } except Exception as exc: payload = { "type": "home_recent", "ts": datetime.now(timezone.utc).isoformat(), "days": recent_days, "error": str(exc), } signature = json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str) if signature != last_recent_signature: last_recent_signature = signature yield _sse_json(payload) sent_any = True if now >= next_services_at: next_services_at = now + 30.0 try: status_payload = await services_status() payload = { "type": "home_services", "ts": datetime.now(timezone.utc).isoformat(), "status": status_payload, } except Exception as exc: payload = { "type": "home_services", "ts": datetime.now(timezone.utc).isoformat(), "error": str(exc), } signature = json.dumps(payload, ensure_ascii=True, separators=(",", ":"), default=str) if signature != last_services_signature: last_services_signature = signature yield _sse_json(payload) sent_any = True if sent_any: heartbeat_counter = 0 else: heartbeat_counter += 1 if heartbeat_counter >= 15: yield ": ping\n\n" heartbeat_counter = 0 await asyncio.sleep(1.0) headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)