some code

This commit is contained in:
2024-11-05 13:36:07 +01:00
parent df1176cca4
commit db89541352
4 changed files with 204 additions and 0 deletions

21
env Normal file
View File

@ -0,0 +1,21 @@
export MQTT_BROKER="172.23.1.102"
export MQTT_PORT=1883
export MQTT_BOXES='{
"box1": {
"label": "living_room"
},
"box2": {
"label": "kitchen"
}
}'
export MQTT_BOX_TOPIC_PREFIXES='{
"window": "heating/sensor/window/",
"high_temp": "heating/config/high_temp/",
"cmd": "heating/command/"
}'
export MQTT_CENTRAL_TOPICS='{
"general_off": "heating/system/general_off",
"maintenance_mode": "heating/system/maintenance_mode"
}'
export MQTT_CLIENT_PREFIX="MyMQTTClient"
export MQTT_PUBLISH_PREFIX="output/"

View File

@ -1 +1,2 @@
loguru==0.7.2
paho-mqtt==2.1.0

167
src/main.py Normal file
View File

@ -0,0 +1,167 @@
import os
import sys
import json
import uuid
import signal
from loguru import logger
import paho.mqtt.client as mqtt
from message_processor import process_message # Import the moved function
# MQTT configuration parameters
BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable
PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set
BOXES_CONFIG = os.getenv("MQTT_BOXES") # JSON configuration of boxes from environment variable
BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes
CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics
PUBLISH_PREFIX = os.getenv("MQTT_PUBLISH_PREFIX", "output/") # Globales Publish-Präfix
# Check if required environment variables are set
missing_vars = []
if not BROKER:
missing_vars.append('MQTT_BROKER')
if not BOXES_CONFIG:
missing_vars.append('MQTT_BOXES')
if not BOX_TOPIC_PREFIXES_CONFIG:
missing_vars.append('MQTT_BOX_TOPIC_PREFIXES')
if not CENTRAL_TOPICS_CONFIG:
missing_vars.append('MQTT_CENTRAL_TOPICS')
if not PUBLISH_PREFIX:
missing_vars.append('MQTT_PUBLISH_PREFIX')
if missing_vars:
logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}")
sys.exit(1)
# context for box operations
context = {}
# publish topic prefix for boxes
context['publish'] = PUBLISH_PREFIX
# Load box configurations from JSON
try:
boxes = json.loads(BOXES_CONFIG)
context['boxes'] = boxes
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON configuration for boxes: {e}")
sys.exit(1)
# Load box-specific topic prefixes from JSON
try:
box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON configuration for box-specific topic prefixes: {e}")
sys.exit(1)
# Load central topics from JSON
try:
central_topics = json.loads(CENTRAL_TOPICS_CONFIG)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON configuration for central topics: {e}")
sys.exit(1)
# Generate CLIENT_ID from UUID and optional prefix
CLIENT_PREFIX = os.getenv("MQTT_CLIENT_PREFIX", "MQTTClient")
CLIENT_ID = f"{CLIENT_PREFIX}_{uuid.uuid4()}"
# Mapping of topics to boxes and topic keys for efficient lookup
topic_mapping = {}
# Callback function for successful connection to the broker
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("Connected to the broker")
# Subscribe to dynamically generated topics for each box and create mappings
for box_name, config in boxes.items():
label = config.get("label")
if not label:
logger.error(f"[{box_name}] No 'label' defined.")
continue
# Generate topics based on configured box-specific prefixes and box label
topics = {}
for topic_key, prefix in box_topic_prefixes.items():
topics[topic_key] = f"{prefix}{label}"
config["subscribe"] = topics # Store the generated topics
# Subscribe to the topics for the box and create mapping
for topic_key, topic in topics.items():
client.subscribe(topic)
topic_mapping[topic] = (box_name, topic_key)
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
# Subscribe to central topics and create mappings
for central_key, central_topic in central_topics.items():
client.subscribe(central_topic)
# Mark central topics with a special key to identify them
topic_mapping[central_topic] = ("__central__", central_key)
logger.info(f"Subscribed to central topic '{central_topic}' (Key: '{central_key}')")
else:
logger.error(f"Connection error with code {rc}")
# Callback function for received messages
def on_message(client, userdata, msg):
try:
topic = msg.topic
payload = msg.payload.decode()
if topic in topic_mapping:
box_name, topic_key = topic_mapping[topic]
if box_name == "__central__":
# Central message, process for all boxes
logger.info(f"[Central] Processing central message for '{topic_key}': {payload}")
for current_box_name in boxes.keys():
process_message(current_box_name, topic_key, payload, context)
else:
# Box-specific message
process_message(box_name, topic_key, payload, context)
else:
logger.warning(f"Received unknown topic: '{topic}'")
except Exception as e:
logger.error(f"Error processing message from '{msg.topic}': {e}")
# Callback function for disconnection
def on_disconnect(client, userdata, rc):
if rc != 0:
logger.warning("Unexpected disconnection, attempting to reconnect...")
else:
logger.info("Disconnected from the broker.")
# Signal handler for graceful shutdown
def handle_exit_signal(signum, frame):
logger.info("Shutting down the program due to external signal.")
client.disconnect() # Disconnects from the broker and stops loop_forever()
# Initialize the MQTT client and configure callbacks
client = mqtt.Client(client_id=CLIENT_ID)
context['client'] = client
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Optional: Set auto-reconnect parameters
client.reconnect_delay_set(min_delay=1, max_delay=120)
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, handle_exit_signal)
signal.signal(signal.SIGINT, handle_exit_signal)
# Connect to the broker
try:
client.connect(BROKER, PORT, keepalive=60)
except Exception as e:
logger.error(f"Failed to connect to the broker: {e}")
sys.exit(1)
# Start the MQTT client loop in a blocking manner
try:
client.loop_forever()
except KeyboardInterrupt:
logger.info("Shutting down the program due to KeyboardInterrupt.")
finally:
client.disconnect() # Ensure the connection is closed
logger.info("Program terminated.")

15
src/message_processor.py Normal file
View File

@ -0,0 +1,15 @@
from loguru import logger
def process_message(box_name, topic_key, payload, context):
try:
logger.info(f"[{box_name}] Processing message for '{topic_key}': {payload}")
# Example processing: Implement your specific logic here
# For example, perform calculations or store data.
result = f"Processed by {box_name} for '{topic_key}': {payload}"
publish_topic = f"{context['publish']}/{box_name}"
context['client'].publish(publish_topic, result)
logger.info(f"[{box_name}] Result published on '{publish_topic}': {result}")
except Exception as e:
logger.error(f"[{box_name}] Error processing '{topic_key}': {e}")