diff --git a/apps/api/main.py b/apps/api/main.py index e9e238d..eabb41d 100644 --- a/apps/api/main.py +++ b/apps/api/main.py @@ -559,25 +559,31 @@ async def event_generator(request: Request) -> AsyncGenerator[str, None]: redis_client = None pubsub = None - # Heartbeat tracking - last_heartbeat = asyncio.get_event_loop().time() heartbeat_interval = 15 # Safari-friendly: shorter interval + # Use listen() iterator for blocking reads with heartbeat timeout + if pubsub: + listener = pubsub.listen() + else: + listener = None + while True: # Check if client disconnected if await request.is_disconnected(): logger.info("SSE client disconnected") break - # Try to get message from Redis (if available) - if pubsub: + # Try to get message from Redis with timeout for heartbeat + if listener: try: + # Wait for message with heartbeat timeout + # If no message arrives within timeout, send heartbeat message = await asyncio.wait_for( - pubsub.get_message(ignore_subscribe_messages=True), - timeout=0.1 + anext(listener), + timeout=heartbeat_interval ) - if message and message["type"] == "message": + if message["type"] == "message": data = message["data"] logger.debug(f"Sending SSE message: {data[:100]}...") @@ -590,24 +596,21 @@ async def event_generator(request: Request) -> AsyncGenerator[str, None]: logger.warning(f"Failed to parse state data for cache: {e}") yield f"event: message\ndata: {data}\n\n" - last_heartbeat = asyncio.get_event_loop().time() - continue # Skip sleep, check for more messages immediately except asyncio.TimeoutError: - pass # No message, continue to heartbeat check + # No message within heartbeat interval - send heartbeat + yield ": ping\n\n" + except StopAsyncIteration: + logger.warning("Redis listener stopped") + break except Exception as e: logger.error(f"Redis error: {e}") - # Continue with heartbeats even if Redis fails - - # Sleep briefly to avoid busy loop - await asyncio.sleep(0.1) - - # Send heartbeat if interval elapsed - current_time = asyncio.get_event_loop().time() - if current_time - last_heartbeat >= heartbeat_interval: - # Comment-style ping (Safari-compatible, no event type) + # Continue with heartbeat-only mode + listener = None + else: + # Heartbeat-only mode (no Redis) + await asyncio.sleep(heartbeat_interval) yield ": ping\n\n" - last_heartbeat = current_time except asyncio.CancelledError: logger.info("SSE connection cancelled by client")