fixes
This commit is contained in:
@@ -301,41 +301,42 @@ async def event_generator(request: Request) -> AsyncGenerator[str, None]:
|
||||
|
||||
try:
|
||||
await pubsub.subscribe(redis_channel)
|
||||
logger.info(f"SSE client connected, subscribed to {redis_channel}")
|
||||
|
||||
# Create heartbeat task
|
||||
# Create heartbeat tracking
|
||||
last_heartbeat = asyncio.get_event_loop().time()
|
||||
heartbeat_interval = 25
|
||||
|
||||
while True:
|
||||
# Check if client disconnected
|
||||
if await request.is_disconnected():
|
||||
logger.info("SSE client disconnected")
|
||||
break
|
||||
|
||||
# Get message with timeout for heartbeat
|
||||
try:
|
||||
message = await asyncio.wait_for(
|
||||
pubsub.get_message(ignore_subscribe_messages=True),
|
||||
timeout=1.0
|
||||
)
|
||||
|
||||
if message and message["type"] == "message":
|
||||
# Send data event
|
||||
data = message["data"]
|
||||
yield f"event: message\ndata: {data}\n\n"
|
||||
last_heartbeat = asyncio.get_event_loop().time()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
# Try to get message (non-blocking)
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
|
||||
|
||||
# Handle actual data messages
|
||||
if message and message["type"] == "message":
|
||||
data = message["data"]
|
||||
logger.debug(f"Sending SSE message: {data[:100]}...")
|
||||
yield f"event: message\ndata: {data}\n\n"
|
||||
last_heartbeat = asyncio.get_event_loop().time()
|
||||
else:
|
||||
# No message, sleep a bit to avoid busy loop
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Send heartbeat every 25 seconds
|
||||
current_time = asyncio.get_event_loop().time()
|
||||
if current_time - last_heartbeat >= 25:
|
||||
if current_time - last_heartbeat >= heartbeat_interval:
|
||||
yield "event: ping\ndata: heartbeat\n\n"
|
||||
last_heartbeat = current_time
|
||||
|
||||
finally:
|
||||
await pubsub.unsubscribe(redis_channel)
|
||||
await pubsub.close()
|
||||
await redis_client.close()
|
||||
await pubsub.aclose()
|
||||
await redis_client.aclose()
|
||||
logger.info("SSE connection closed")
|
||||
|
||||
|
||||
@app.get("/realtime")
|
||||
|
||||
Reference in New Issue
Block a user