better stopping
All checks were successful
ci/woodpecker/tag/build/5 Pipeline was successful
ci/woodpecker/tag/build/6 Pipeline was successful
ci/woodpecker/tag/build/1 Pipeline was successful
ci/woodpecker/tag/build/4 Pipeline was successful
ci/woodpecker/tag/namespace Pipeline was successful
ci/woodpecker/tag/config Pipeline was successful
ci/woodpecker/tag/build/7 Pipeline was successful
ci/woodpecker/tag/build/3 Pipeline was successful
ci/woodpecker/tag/build/2 Pipeline was successful
ci/woodpecker/tag/deploy/1 Pipeline was successful
ci/woodpecker/tag/deploy/4 Pipeline was successful
ci/woodpecker/tag/deploy/5 Pipeline was successful
ci/woodpecker/tag/deploy/3 Pipeline was successful
ci/woodpecker/tag/deploy/2 Pipeline was successful
ci/woodpecker/tag/deploy/6 Pipeline was successful
ci/woodpecker/tag/ingress Pipeline was successful
All checks were successful
ci/woodpecker/tag/build/5 Pipeline was successful
ci/woodpecker/tag/build/6 Pipeline was successful
ci/woodpecker/tag/build/1 Pipeline was successful
ci/woodpecker/tag/build/4 Pipeline was successful
ci/woodpecker/tag/namespace Pipeline was successful
ci/woodpecker/tag/config Pipeline was successful
ci/woodpecker/tag/build/7 Pipeline was successful
ci/woodpecker/tag/build/3 Pipeline was successful
ci/woodpecker/tag/build/2 Pipeline was successful
ci/woodpecker/tag/deploy/1 Pipeline was successful
ci/woodpecker/tag/deploy/4 Pipeline was successful
ci/woodpecker/tag/deploy/5 Pipeline was successful
ci/woodpecker/tag/deploy/3 Pipeline was successful
ci/woodpecker/tag/deploy/2 Pipeline was successful
ci/woodpecker/tag/deploy/6 Pipeline was successful
ci/woodpecker/tag/ingress Pipeline was successful
This commit is contained in:
@@ -176,14 +176,19 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
|
||||
# Publish startup message
|
||||
await publish_example(client)
|
||||
|
||||
# Message loop
|
||||
# Message loop with timeout to allow shutdown check
|
||||
async for message in client.messages:
|
||||
if shutdown_event.is_set():
|
||||
logger.info("Shutdown event detected, breaking message loop")
|
||||
break
|
||||
try:
|
||||
await handle_message(message, client)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in message handler: {e}", exc_info=True)
|
||||
|
||||
# If we exit the loop due to shutdown, break the reconnect loop too
|
||||
if shutdown_event.is_set():
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("MQTT worker cancelled")
|
||||
@@ -217,8 +222,17 @@ async def main() -> None:
|
||||
# Wait for shutdown signal
|
||||
await shutdown_event.wait()
|
||||
|
||||
# Wait for worker to finish
|
||||
await worker_task
|
||||
# Give worker a moment to finish gracefully
|
||||
logger.info("Waiting for MQTT worker to finish...")
|
||||
try:
|
||||
await asyncio.wait_for(worker_task, timeout=5.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("MQTT worker did not finish in time, cancelling...")
|
||||
worker_task.cancel()
|
||||
try:
|
||||
await worker_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
logger.info("Pulsegen application stopped")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user