198 lines
7.5 KiB
Python
198 lines
7.5 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import uuid
|
|
import signal
|
|
from loguru import logger
|
|
import paho.mqtt.client as mqtt
|
|
from message_processor import process_message, prepare_context
|
|
|
|
|
|
# MQTT configuration parameters
|
|
BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable
|
|
PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set
|
|
BOXES_CONFIG = os.getenv("BOXES") # JSON configuration of boxes from environment variable
|
|
BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes
|
|
CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics
|
|
OFF_TEMPERATURE = os.getenv("OFF_TEMPERATURE", "5.0")
|
|
LOW_TEMPERATURE = os.getenv("LOW_TEMPERATURE", "15.0")
|
|
DEFAULT_HIGH_TEMPERATURE = os.getenv("DEFAULT_HIGH_TEMPERATURE", "21.0")
|
|
MAINTENANCE_TEMPERATURE = os.getenv("MAINTENANCE_TEMPERATURE", "30.0")
|
|
STATUS_TOPIC = os.getenv("MQTT_STATUS_TOPIC")
|
|
STATUSTEXT_TOPIC_PREFIX = os.getenv("MQTT_STATUSTEXT_TOPIC_PREFIX")
|
|
|
|
# Check if required environment variables are set
|
|
missing_vars = []
|
|
if not BROKER:
|
|
missing_vars.append('MQTT_BROKER')
|
|
if not BOXES_CONFIG:
|
|
missing_vars.append('MQTT_BOXES')
|
|
if not BOX_TOPIC_PREFIXES_CONFIG:
|
|
missing_vars.append('MQTT_BOX_TOPIC_PREFIXES')
|
|
if not CENTRAL_TOPICS_CONFIG:
|
|
missing_vars.append('MQTT_CENTRAL_TOPICS')
|
|
if not STATUS_TOPIC:
|
|
missing_vars.append('MQTT_STATUS_TOPIC')
|
|
if not STATUSTEXT_TOPIC_PREFIX:
|
|
missing_vars.append('MQTT_STATUSTEXT_TOPIC_PREFIX')
|
|
|
|
if missing_vars:
|
|
logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}")
|
|
sys.exit(1)
|
|
|
|
# context for box operations
|
|
context = {}
|
|
|
|
# configuration values for boxes
|
|
context['off_temperature'] = OFF_TEMPERATURE
|
|
context['low_temperature'] = LOW_TEMPERATURE
|
|
context['default_high_temperature'] = DEFAULT_HIGH_TEMPERATURE
|
|
context['maintenance_temperature'] = MAINTENANCE_TEMPERATURE
|
|
context['status_topic'] = STATUS_TOPIC
|
|
context['statustext_topic_prefix'] = STATUSTEXT_TOPIC_PREFIX
|
|
|
|
# Load box configurations from JSON
|
|
try:
|
|
boxes = json.loads(BOXES_CONFIG)
|
|
|
|
# boxes structure added to global context to give process_message access to it
|
|
context['boxes'] = boxes
|
|
|
|
# add context dict to each box in the list
|
|
for box_name, config in boxes.items():
|
|
config['context'] = prepare_context(box_name, context)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error parsing JSON configuration for boxes: {e}")
|
|
sys.exit(1)
|
|
|
|
# Load box-specific topic prefixes from JSON
|
|
try:
|
|
box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG)
|
|
|
|
# Validation: Check if the required keys are present
|
|
required_keys = {'high_temp', 'cmd', 'overwrite_window'}
|
|
missing_keys = required_keys - box_topic_prefixes.keys()
|
|
|
|
if missing_keys:
|
|
raise ValueError(f"Error: The following keys are missing in MQTT_BOX_TOPIC_PREFIXES: {', '.join(missing_keys)}")
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
logger.error(str(e))
|
|
sys.exit(1)
|
|
|
|
# Load central topics from JSON
|
|
try:
|
|
central_topics = json.loads(CENTRAL_TOPICS_CONFIG)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error parsing JSON configuration for central topics: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
# Generate CLIENT_ID from UUID and optional prefix
|
|
CLIENT_PREFIX = os.getenv("MQTT_CLIENT_PREFIX", "MQTTClient")
|
|
CLIENT_ID = f"{CLIENT_PREFIX}_{uuid.uuid4()}"
|
|
|
|
# Mapping of topics to boxes and topic keys for efficient lookup
|
|
topic_mapping = {}
|
|
|
|
# Callback function for successful connection to the broker
|
|
def on_connect(client, userdata, flags, reason_code, properties):
|
|
if reason_code == 0:
|
|
logger.info("Connected to the broker")
|
|
|
|
# Subscribe to dynamically generated topics for each box and create mappings
|
|
for box_name, config in boxes.items():
|
|
label = config.get("label")
|
|
if not label:
|
|
logger.error(f"[{box_name}] No 'label' defined.")
|
|
continue
|
|
|
|
# Generate topics based on configured box-specific prefixes and box label
|
|
for topic_key, prefix in box_topic_prefixes.items():
|
|
topic = f"{prefix}{label}"
|
|
client.subscribe(topic)
|
|
topic_mapping[topic] = (box_name, topic_key)
|
|
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
|
|
|
# Subscribe window topics from box
|
|
for window_topic in config.get("windows"):
|
|
topic = window_topic.get("topic")
|
|
label = window_topic.get("label")
|
|
topic_key = f"window/{label}"
|
|
client.subscribe(topic)
|
|
topic_mapping[topic] = (box_name, topic_key)
|
|
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
|
|
|
# Subscribe to central topics and create mappings
|
|
for central_key, central_topic in central_topics.items():
|
|
client.subscribe(central_topic)
|
|
# Mark central topics with a special key to identify them
|
|
topic_mapping[central_topic] = ("__central__", central_key)
|
|
logger.info(f"Subscribed to central topic '{central_topic}' (Key: '{central_key}')")
|
|
else:
|
|
logger.error(f"Connection error with code {reason_code}")
|
|
|
|
# Callback function for received messages
|
|
def on_message(client, userdata, msg):
|
|
try:
|
|
topic = msg.topic
|
|
payload = msg.payload.decode()
|
|
|
|
if topic in topic_mapping:
|
|
box_name, topic_key = topic_mapping[topic]
|
|
if box_name == "__central__":
|
|
# Central message, process for all boxes
|
|
logger.info(f"[Central] Processing central message for '{topic_key}': {payload}")
|
|
for current_box_name in boxes.keys():
|
|
process_message(current_box_name, topic_key, payload, context)
|
|
else:
|
|
# Box-specific message
|
|
process_message(box_name, topic_key, payload, context)
|
|
else:
|
|
logger.warning(f"Received unknown topic: '{topic}'")
|
|
except Exception as e:
|
|
logger.error(f"Error processing message from '{msg.topic}': {e}")
|
|
|
|
# Callback function for disconnection
|
|
def on_disconnect(client, userdata, flags, reason_code, properties):
|
|
if reason_code != 0:
|
|
logger.warning("Unexpected disconnection, attempting to reconnect...")
|
|
else:
|
|
logger.info("Disconnected from the broker.")
|
|
|
|
# Signal handler for graceful shutdown
|
|
def handle_exit_signal(signum, frame):
|
|
logger.info("Shutting down the program due to external signal.")
|
|
client.disconnect() # Disconnects from the broker and stops loop_forever()
|
|
|
|
# Initialize the MQTT client and configure callbacks
|
|
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID, )
|
|
|
|
context['client'] = client
|
|
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
client.on_disconnect = on_disconnect
|
|
|
|
# Optional: Set auto-reconnect parameters
|
|
client.reconnect_delay_set(min_delay=1, max_delay=120)
|
|
|
|
# Register signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGTERM, handle_exit_signal)
|
|
signal.signal(signal.SIGINT, handle_exit_signal)
|
|
|
|
# Connect to the broker
|
|
try:
|
|
client.connect(BROKER, PORT, keepalive=60)
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to the broker: {e}")
|
|
sys.exit(1)
|
|
|
|
# Start the MQTT client loop in a blocking manner
|
|
try:
|
|
client.loop_forever()
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down the program due to KeyboardInterrupt.")
|
|
finally:
|
|
client.disconnect() # Ensure the connection is closed
|
|
logger.info("Program terminated.")
|