"""Pulsegen - MQTT pulse generator application.""" import asyncio import json import logging import os import signal import uuid from typing import Any from aiomqtt import Client, Message logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) COIL_STATUS_PREFIX = "dt1/di" COIL_STATUS_TOPIC = f"{COIL_STATUS_PREFIX}/+" PULSEGEN_COMMAND_PREFIX = "pulsegen/command" PULSEGEN_COMMAND_TOPIC = f"{PULSEGEN_COMMAND_PREFIX}/+/+" COIL_COMMAND_PREFIX = "dt1/coil" PULSEGEN_STATUS_PREFIX = "pulsegen/status" COIL_STATUS_CACHE: dict[int, bool] = {} def get_mqtt_settings() -> tuple[str, int]: """Get MQTT broker settings from environment variables. Returns: tuple: (broker_host, broker_port) """ broker = os.getenv("MQTT_BROKER", "localhost") port = int(os.getenv("MQTT_PORT", "1883")) logger.info(f"MQTT settings: broker={broker}, port={port}") return broker, port async def handle_message(message: Message, client: Client) -> None: """Handle incoming MQTT message. Args: message: MQTT message object client: MQTT client instance """ try: payload = message.payload.decode() logger.info(f"Received message on {message.topic}: {payload}") try: topic = str(message.topic) match topic.split("/"): case [prefix, di, coil_id] if f"{prefix}/{di}" == COIL_STATUS_PREFIX: try: coil_num = int(coil_id) except ValueError: logger.debug(f"Invalid coil id in topic: {topic}") return state = payload.lower() in ("1", "true", "on") COIL_STATUS_CACHE[coil_num] = state logger.info(f"Updated coil {coil_num} status to {state}") logger.info(f"Publishing pulsegen status for coil {coil_num}: {state}") await client.publish( topic=f"{PULSEGEN_STATUS_PREFIX}/{coil_num}", payload="on" if state else "off", qos=1, retain=True, ) case [prefix, command, coil_in_id, coil_out_id] if f"{prefix}/{command}" == PULSEGEN_COMMAND_PREFIX: try: coil_in_id = int(coil_in_id) coil_out_id = int(coil_out_id) except ValueError: logger.debug(f"Invalid coil id in topic: {topic}") return try: coil_state = COIL_STATUS_CACHE[coil_in_id] except KeyError: logger.debug(f"Coil {coil_in_id} status unknown, cannot process command") return cmd = payload.lower() in ("1", "true", "on") if cmd == coil_state: logger.info(f"Coil {coil_in_id} already in desired state {cmd}, ignoring command") return logger.info(f"Received pulsegen command on {topic}: {coil_in_id=}, {coil_out_id=}, {cmd=}") coil_cmd_topic = f"{COIL_COMMAND_PREFIX}/{coil_out_id}" logger.info(f"Sending raising edge command: topic={coil_cmd_topic}") await client.publish( topic=coil_cmd_topic, payload="1", qos=1, retain=False, ) await asyncio.sleep(0.2) logger.info(f"Sending falling edge command: topic={coil_cmd_topic}") await client.publish( topic=coil_cmd_topic, payload="0", qos=1, retain=False, ) case _: logger.debug(f"Ignoring message on unrelated topic: {topic}") except Exception as e: logger.debug(f"Exception when handling payload: {e}") except Exception as e: logger.error(f"Error handling message: {e}", exc_info=True) async def publish_example(client: Client) -> None: """Example function to publish MQTT messages. Args: client: MQTT client instance """ topic = "pulsegen/status" payload = { "status": "running", "timestamp": asyncio.get_event_loop().time() } await client.publish( topic=topic, payload=json.dumps(payload), qos=1 ) logger.info(f"Published to {topic}: {payload}") async def mqtt_worker(shutdown_event: asyncio.Event) -> None: """Main MQTT worker loop. Connects to MQTT broker, subscribes to topics, and processes messages. Args: shutdown_event: Event to signal shutdown """ broker, port = get_mqtt_settings() reconnect_interval = 5 # seconds while not shutdown_event.is_set(): try: logger.info(f"Connecting to MQTT broker {broker}:{port}...") async with Client( hostname=broker, port=port, identifier=f"pulsegen-{uuid.uuid4()}", ) as client: logger.info("Connected to MQTT broker") # Subscribe to topics for topic in [PULSEGEN_COMMAND_TOPIC, COIL_STATUS_TOPIC]: await client.subscribe(topic) logger.info(f"Subscribed to {topic}") # Publish startup message await publish_example(client) # Message loop with timeout to allow shutdown check async for message in client.messages: if shutdown_event.is_set(): logger.info("Shutdown event detected, breaking message loop") break try: await handle_message(message, client) except Exception as e: logger.error(f"Error in message handler: {e}", exc_info=True) # If we exit the loop due to shutdown, break the reconnect loop too if shutdown_event.is_set(): break except asyncio.CancelledError: logger.info("MQTT worker cancelled") break except Exception as e: logger.error(f"MQTT error: {e}", exc_info=True) if not shutdown_event.is_set(): logger.info(f"Reconnecting in {reconnect_interval} seconds...") await asyncio.sleep(reconnect_interval) async def main() -> None: """Main application entry point.""" logger.info("Starting pulsegen application...") # Shutdown event for graceful shutdown shutdown_event = asyncio.Event() # Setup signal handlers def signal_handler(sig: int) -> None: logger.info(f"Received signal {sig}, initiating shutdown...") shutdown_event.set() loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda s=sig: signal_handler(s)) # Start MQTT worker worker_task = asyncio.create_task(mqtt_worker(shutdown_event)) # Wait for shutdown signal await shutdown_event.wait() # Give worker a moment to finish gracefully logger.info("Waiting for MQTT worker to finish...") try: await asyncio.wait_for(worker_task, timeout=5.0) except asyncio.TimeoutError: logger.warning("MQTT worker did not finish in time, cancelling...") worker_task.cancel() try: await worker_task except asyncio.CancelledError: pass logger.info("Pulsegen application stopped") if __name__ == "__main__": asyncio.run(main())