From db8954135275753eb9678e3fd20c6249dfde2048 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Tue, 5 Nov 2024 13:36:07 +0100 Subject: [PATCH] some code --- env | 21 +++++ requirements.txt | 1 + src/main.py | 167 +++++++++++++++++++++++++++++++++++++++ src/message_processor.py | 15 ++++ 4 files changed, 204 insertions(+) create mode 100644 env create mode 100644 src/main.py create mode 100644 src/message_processor.py diff --git a/env b/env new file mode 100644 index 0000000..5e2174f --- /dev/null +++ b/env @@ -0,0 +1,21 @@ +export MQTT_BROKER="172.23.1.102" +export MQTT_PORT=1883 +export MQTT_BOXES='{ + "box1": { + "label": "living_room" + }, + "box2": { + "label": "kitchen" + } +}' +export MQTT_BOX_TOPIC_PREFIXES='{ + "window": "heating/sensor/window/", + "high_temp": "heating/config/high_temp/", + "cmd": "heating/command/" +}' +export MQTT_CENTRAL_TOPICS='{ + "general_off": "heating/system/general_off", + "maintenance_mode": "heating/system/maintenance_mode" +}' +export MQTT_CLIENT_PREFIX="MyMQTTClient" +export MQTT_PUBLISH_PREFIX="output/" diff --git a/requirements.txt b/requirements.txt index 96a2716..6d577a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ +loguru==0.7.2 paho-mqtt==2.1.0 diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..b9b2353 --- /dev/null +++ b/src/main.py @@ -0,0 +1,167 @@ +import os +import sys +import json +import uuid +import signal +from loguru import logger +import paho.mqtt.client as mqtt +from message_processor import process_message # Import the moved function + + +# MQTT configuration parameters +BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable +PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set +BOXES_CONFIG = os.getenv("MQTT_BOXES") # JSON configuration of boxes from environment variable +BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes +CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics +PUBLISH_PREFIX = os.getenv("MQTT_PUBLISH_PREFIX", "output/") # Globales Publish-Präfix + +# Check if required environment variables are set +missing_vars = [] +if not BROKER: + missing_vars.append('MQTT_BROKER') +if not BOXES_CONFIG: + missing_vars.append('MQTT_BOXES') +if not BOX_TOPIC_PREFIXES_CONFIG: + missing_vars.append('MQTT_BOX_TOPIC_PREFIXES') +if not CENTRAL_TOPICS_CONFIG: + missing_vars.append('MQTT_CENTRAL_TOPICS') +if not PUBLISH_PREFIX: + missing_vars.append('MQTT_PUBLISH_PREFIX') + +if missing_vars: + logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}") + sys.exit(1) + +# context for box operations +context = {} + +# publish topic prefix for boxes +context['publish'] = PUBLISH_PREFIX + +# Load box configurations from JSON +try: + boxes = json.loads(BOXES_CONFIG) + context['boxes'] = boxes +except json.JSONDecodeError as e: + logger.error(f"Error parsing JSON configuration for boxes: {e}") + sys.exit(1) + +# Load box-specific topic prefixes from JSON +try: + box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG) +except json.JSONDecodeError as e: + logger.error(f"Error parsing JSON configuration for box-specific topic prefixes: {e}") + sys.exit(1) + +# Load central topics from JSON +try: + central_topics = json.loads(CENTRAL_TOPICS_CONFIG) +except json.JSONDecodeError as e: + logger.error(f"Error parsing JSON configuration for central topics: {e}") + sys.exit(1) + +# Generate CLIENT_ID from UUID and optional prefix +CLIENT_PREFIX = os.getenv("MQTT_CLIENT_PREFIX", "MQTTClient") +CLIENT_ID = f"{CLIENT_PREFIX}_{uuid.uuid4()}" + +# Mapping of topics to boxes and topic keys for efficient lookup +topic_mapping = {} + +# Callback function for successful connection to the broker +def on_connect(client, userdata, flags, rc): + if rc == 0: + logger.info("Connected to the broker") + + # Subscribe to dynamically generated topics for each box and create mappings + for box_name, config in boxes.items(): + label = config.get("label") + if not label: + logger.error(f"[{box_name}] No 'label' defined.") + continue + + # Generate topics based on configured box-specific prefixes and box label + topics = {} + for topic_key, prefix in box_topic_prefixes.items(): + topics[topic_key] = f"{prefix}{label}" + config["subscribe"] = topics # Store the generated topics + + # Subscribe to the topics for the box and create mapping + for topic_key, topic in topics.items(): + client.subscribe(topic) + topic_mapping[topic] = (box_name, topic_key) + logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')") + + # Subscribe to central topics and create mappings + for central_key, central_topic in central_topics.items(): + client.subscribe(central_topic) + # Mark central topics with a special key to identify them + topic_mapping[central_topic] = ("__central__", central_key) + logger.info(f"Subscribed to central topic '{central_topic}' (Key: '{central_key}')") + else: + logger.error(f"Connection error with code {rc}") + +# Callback function for received messages +def on_message(client, userdata, msg): + try: + topic = msg.topic + payload = msg.payload.decode() + + if topic in topic_mapping: + box_name, topic_key = topic_mapping[topic] + if box_name == "__central__": + # Central message, process for all boxes + logger.info(f"[Central] Processing central message for '{topic_key}': {payload}") + for current_box_name in boxes.keys(): + process_message(current_box_name, topic_key, payload, context) + else: + # Box-specific message + process_message(box_name, topic_key, payload, context) + else: + logger.warning(f"Received unknown topic: '{topic}'") + except Exception as e: + logger.error(f"Error processing message from '{msg.topic}': {e}") + +# Callback function for disconnection +def on_disconnect(client, userdata, rc): + if rc != 0: + logger.warning("Unexpected disconnection, attempting to reconnect...") + else: + logger.info("Disconnected from the broker.") + +# Signal handler for graceful shutdown +def handle_exit_signal(signum, frame): + logger.info("Shutting down the program due to external signal.") + client.disconnect() # Disconnects from the broker and stops loop_forever() + +# Initialize the MQTT client and configure callbacks +client = mqtt.Client(client_id=CLIENT_ID) + +context['client'] = client + +client.on_connect = on_connect +client.on_message = on_message +client.on_disconnect = on_disconnect + +# Optional: Set auto-reconnect parameters +client.reconnect_delay_set(min_delay=1, max_delay=120) + +# Register signal handlers for graceful shutdown +signal.signal(signal.SIGTERM, handle_exit_signal) +signal.signal(signal.SIGINT, handle_exit_signal) + +# Connect to the broker +try: + client.connect(BROKER, PORT, keepalive=60) +except Exception as e: + logger.error(f"Failed to connect to the broker: {e}") + sys.exit(1) + +# Start the MQTT client loop in a blocking manner +try: + client.loop_forever() +except KeyboardInterrupt: + logger.info("Shutting down the program due to KeyboardInterrupt.") +finally: + client.disconnect() # Ensure the connection is closed + logger.info("Program terminated.") diff --git a/src/message_processor.py b/src/message_processor.py new file mode 100644 index 0000000..716dd64 --- /dev/null +++ b/src/message_processor.py @@ -0,0 +1,15 @@ +from loguru import logger + +def process_message(box_name, topic_key, payload, context): + try: + logger.info(f"[{box_name}] Processing message for '{topic_key}': {payload}") + + # Example processing: Implement your specific logic here + # For example, perform calculations or store data. + result = f"Processed by {box_name} for '{topic_key}': {payload}" + + publish_topic = f"{context['publish']}/{box_name}" + context['client'].publish(publish_topic, result) + logger.info(f"[{box_name}] Result published on '{publish_topic}': {result}") + except Exception as e: + logger.error(f"[{box_name}] Error processing '{topic_key}': {e}")