import os import sys import json import uuid import signal from loguru import logger import paho.mqtt.client as mqtt from message_processor import process_message, prepare_context # MQTT configuration parameters BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set BOXES_CONFIG = os.getenv("BOXES") # JSON configuration of boxes from environment variable BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics OFF_TEMPERATURE = os.getenv("OFF_TEMPERATURE", "5.0") LOW_TEMPERATURE = os.getenv("LOW_TEMPERATURE", "15.0") MAINTENANCE_TEMPERATURE = os.getenv("MAINTENANCE_TEMPERATURE", "30.0") # Check if required environment variables are set missing_vars = [] if not BROKER: missing_vars.append('MQTT_BROKER') if not BOXES_CONFIG: missing_vars.append('MQTT_BOXES') if not BOX_TOPIC_PREFIXES_CONFIG: missing_vars.append('MQTT_BOX_TOPIC_PREFIXES') if not CENTRAL_TOPICS_CONFIG: missing_vars.append('MQTT_CENTRAL_TOPICS') if missing_vars: logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}") sys.exit(1) # context for box operations context = {} # configuration values for boxes context['off_temperature'] = OFF_TEMPERATURE context['low_temperature'] = LOW_TEMPERATURE context['maintenance_temperature'] = MAINTENANCE_TEMPERATURE # Load box configurations from JSON try: boxes = json.loads(BOXES_CONFIG) # boxes structure added to global context to give process_message access to it context['boxes'] = boxes # add context dict to each box in the list for box_name, config in boxes.items(): config['context'] = prepare_context(box_name, context) except json.JSONDecodeError as e: logger.error(f"Error parsing JSON configuration for boxes: {e}") sys.exit(1) # Load box-specific topic prefixes from JSON try: box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG) # Validation: Check if the required keys are present required_keys = {'high_temp', 'cmd'} missing_keys = required_keys - box_topic_prefixes.keys() if missing_keys: raise ValueError(f"Error: The following keys are missing in MQTT_BOX_TOPIC_PREFIXES: {', '.join(missing_keys)}") except (json.JSONDecodeError, ValueError) as e: logger.error(str(e)) sys.exit(1) # Load central topics from JSON try: central_topics = json.loads(CENTRAL_TOPICS_CONFIG) except json.JSONDecodeError as e: logger.error(f"Error parsing JSON configuration for central topics: {e}") sys.exit(1) # Generate CLIENT_ID from UUID and optional prefix CLIENT_PREFIX = os.getenv("MQTT_CLIENT_PREFIX", "MQTTClient") CLIENT_ID = f"{CLIENT_PREFIX}_{uuid.uuid4()}" # Mapping of topics to boxes and topic keys for efficient lookup topic_mapping = {} # Callback function for successful connection to the broker def on_connect(client, userdata, flags, rc): if rc == 0: logger.info("Connected to the broker") # Subscribe to dynamically generated topics for each box and create mappings for box_name, config in boxes.items(): label = config.get("label") if not label: logger.error(f"[{box_name}] No 'label' defined.") continue # Generate topics based on configured box-specific prefixes and box label for topic_key, prefix in box_topic_prefixes.items(): topic = f"{prefix}{label}" client.subscribe(topic) topic_mapping[topic] = (box_name, topic_key) logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')") # Subscribe window topics from box for window_topic in config.get("windows"): topic = window_topic.get("topic") label = window_topic.get("label") topic_key = f"window/{label}" client.subscribe(topic) topic_mapping[topic] = (box_name, topic_key) logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')") # Subscribe to central topics and create mappings for central_key, central_topic in central_topics.items(): client.subscribe(central_topic) # Mark central topics with a special key to identify them topic_mapping[central_topic] = ("__central__", central_key) logger.info(f"Subscribed to central topic '{central_topic}' (Key: '{central_key}')") else: logger.error(f"Connection error with code {rc}") # Callback function for received messages def on_message(client, userdata, msg): try: topic = msg.topic payload = msg.payload.decode() if topic in topic_mapping: box_name, topic_key = topic_mapping[topic] if box_name == "__central__": # Central message, process for all boxes logger.info(f"[Central] Processing central message for '{topic_key}': {payload}") for current_box_name in boxes.keys(): process_message(current_box_name, topic_key, payload, context) else: # Box-specific message process_message(box_name, topic_key, payload, context) else: logger.warning(f"Received unknown topic: '{topic}'") except Exception as e: logger.error(f"Error processing message from '{msg.topic}': {e}") # Callback function for disconnection def on_disconnect(client, userdata, rc): if rc != 0: logger.warning("Unexpected disconnection, attempting to reconnect...") else: logger.info("Disconnected from the broker.") # Signal handler for graceful shutdown def handle_exit_signal(signum, frame): logger.info("Shutting down the program due to external signal.") client.disconnect() # Disconnects from the broker and stops loop_forever() # Initialize the MQTT client and configure callbacks client = mqtt.Client(client_id=CLIENT_ID) context['client'] = client client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect # Optional: Set auto-reconnect parameters client.reconnect_delay_set(min_delay=1, max_delay=120) # Register signal handlers for graceful shutdown signal.signal(signal.SIGTERM, handle_exit_signal) signal.signal(signal.SIGINT, handle_exit_signal) # Connect to the broker try: client.connect(BROKER, PORT, keepalive=60) except Exception as e: logger.error(f"Failed to connect to the broker: {e}") sys.exit(1) # Start the MQTT client loop in a blocking manner try: client.loop_forever() except KeyboardInterrupt: logger.info("Shutting down the program due to KeyboardInterrupt.") finally: client.disconnect() # Ensure the connection is closed logger.info("Program terminated.")