Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
24b2f70caf
|
|||
|
d3c1ec404a
|
@@ -5,6 +5,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
import uuid
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from aiomqtt import Client, Message
|
from aiomqtt import Client, Message
|
||||||
@@ -163,7 +164,7 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
|
|||||||
async with Client(
|
async with Client(
|
||||||
hostname=broker,
|
hostname=broker,
|
||||||
port=port,
|
port=port,
|
||||||
identifier="pulsegen"
|
identifier=f"pulsegen-{uuid.uuid4()}",
|
||||||
) as client:
|
) as client:
|
||||||
logger.info("Connected to MQTT broker")
|
logger.info("Connected to MQTT broker")
|
||||||
|
|
||||||
@@ -175,15 +176,20 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
|
|||||||
# Publish startup message
|
# Publish startup message
|
||||||
await publish_example(client)
|
await publish_example(client)
|
||||||
|
|
||||||
# Message loop
|
# Message loop with timeout to allow shutdown check
|
||||||
async for message in client.messages:
|
async for message in client.messages:
|
||||||
if shutdown_event.is_set():
|
if shutdown_event.is_set():
|
||||||
|
logger.info("Shutdown event detected, breaking message loop")
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
await handle_message(message, client)
|
await handle_message(message, client)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in message handler: {e}", exc_info=True)
|
logger.error(f"Error in message handler: {e}", exc_info=True)
|
||||||
|
|
||||||
|
# If we exit the loop due to shutdown, break the reconnect loop too
|
||||||
|
if shutdown_event.is_set():
|
||||||
|
break
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info("MQTT worker cancelled")
|
logger.info("MQTT worker cancelled")
|
||||||
break
|
break
|
||||||
@@ -216,8 +222,17 @@ async def main() -> None:
|
|||||||
# Wait for shutdown signal
|
# Wait for shutdown signal
|
||||||
await shutdown_event.wait()
|
await shutdown_event.wait()
|
||||||
|
|
||||||
# Wait for worker to finish
|
# Give worker a moment to finish gracefully
|
||||||
await worker_task
|
logger.info("Waiting for MQTT worker to finish...")
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(worker_task, timeout=5.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.warning("MQTT worker did not finish in time, cancelling...")
|
||||||
|
worker_task.cancel()
|
||||||
|
try:
|
||||||
|
await worker_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info("Pulsegen application stopped")
|
logger.info("Pulsegen application stopped")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user