131 lines
4.7 KiB
Python
131 lines
4.7 KiB
Python
import sys
|
||
import json
|
||
import uuid
|
||
import signal
|
||
from loguru import logger
|
||
import paho.mqtt.client as mqtt
|
||
|
||
from box import Box
|
||
from config import Config
|
||
|
||
|
||
config = Config()
|
||
|
||
boxes = []
|
||
for k, v in config.BOXES.items():
|
||
boxes.append(Box(k, v, config))
|
||
|
||
|
||
# Generate CLIENT_ID from UUID and optional prefix
|
||
CLIENT_ID = f"{config.MQTT_CLIENT_PREFIX}_{uuid.uuid4()}"
|
||
|
||
# Mapping of topics to boxes and topic keys for efficient lookup
|
||
topic_mapping = {}
|
||
|
||
# Callback function for successful connection to the broker
|
||
def on_connect(client, userdata, flags, reason_code, properties):
|
||
if reason_code == 0:
|
||
logger.info("Connected to the broker")
|
||
|
||
# Subscribe to dynamically generated topics for each box and create mappings
|
||
for box in boxes:
|
||
label = box.id
|
||
# Generate topics based on configured box-specific prefixes and box label
|
||
for topic_key, prefix in config.BOX_TOPIC_PREFIXES.items():
|
||
topic = f"{prefix}{label}"
|
||
client.subscribe(topic)
|
||
topic_mapping[topic] = (box, topic_key)
|
||
logger.info(f"[{box.id}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
||
|
||
# Subscribe window topics from box
|
||
for label, window in box.windows.items():
|
||
topic = window['topic']
|
||
topic_key = f"window/{label}"
|
||
client.subscribe(topic)
|
||
topic_mapping[topic] = (box, topic_key)
|
||
logger.info(f"[{box.id}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
||
|
||
# Subscribe feedback topic if one is available
|
||
if box.feedback_topic:
|
||
topic = box.feedback_topic
|
||
topic_key = "feedback"
|
||
client.subscribe(topic)
|
||
topic_mapping[topic] = (box, topic_key)
|
||
logger.info(f"[{box.id}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
||
|
||
# Subscribe to central topics and create mappings
|
||
for central_key, central_topic in config.CENTRAL_TOPICS.items():
|
||
client.subscribe(central_topic)
|
||
# Mark central topics with a special key to identify them
|
||
topic_mapping[central_topic] = ("__central__", central_key)
|
||
logger.info(f"Subscribed to central topic '{central_topic}' (Key: '{central_key}')")
|
||
else:
|
||
logger.error(f"Connection error with code {reason_code}")
|
||
|
||
# Callback function for received messages
|
||
def on_message(client, userdata, msg):
|
||
try:
|
||
topic = msg.topic
|
||
payload = msg.payload.decode()
|
||
|
||
if topic in topic_mapping:
|
||
box, topic_key = topic_mapping[topic]
|
||
if box == "__central__":
|
||
# Central message, process for all boxes
|
||
logger.info(f"[Central] Processing central message for '{topic_key}': {payload}")
|
||
for b in boxes:
|
||
b.handle_message(topic_key, payload)
|
||
else:
|
||
# Box-specific message
|
||
logger.info(f"[{box.id}] Processing box-specific message for ‘{topic_key}': {payload}")
|
||
box.handle_message(topic_key, payload)
|
||
else:
|
||
logger.warning(f"Received unknown topic: '{topic}'")
|
||
except Exception as e:
|
||
logger.error(f"Error processing message from '{msg.topic}': {e}")
|
||
|
||
# Callback function for disconnection
|
||
def on_disconnect(client, userdata, flags, reason_code, properties):
|
||
if reason_code != 0:
|
||
logger.warning("Unexpected disconnection, attempting to reconnect...")
|
||
else:
|
||
logger.info("Disconnected from the broker.")
|
||
|
||
# Signal handler for graceful shutdown
|
||
def handle_exit_signal(signum, frame):
|
||
logger.info("Shutting down the program due to external signal.")
|
||
client.disconnect() # Disconnects from the broker and stops loop_forever()
|
||
|
||
# Initialize the MQTT client and configure callbacks
|
||
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID, )
|
||
for box in boxes:
|
||
box.mqtt_client = client
|
||
|
||
|
||
client.on_connect = on_connect
|
||
client.on_message = on_message
|
||
client.on_disconnect = on_disconnect
|
||
|
||
# Optional: Set auto-reconnect parameters
|
||
client.reconnect_delay_set(min_delay=1, max_delay=120)
|
||
|
||
# Register signal handlers for graceful shutdown
|
||
signal.signal(signal.SIGTERM, handle_exit_signal)
|
||
signal.signal(signal.SIGINT, handle_exit_signal)
|
||
|
||
# Connect to the broker
|
||
try:
|
||
client.connect(config.MQTT_BROKER, config.MQTT_PORT, keepalive=60)
|
||
except Exception as e:
|
||
logger.error(f"Failed to connect to the broker: {e}")
|
||
sys.exit(1)
|
||
|
||
# Start the MQTT client loop in a blocking manner
|
||
try:
|
||
client.loop_forever()
|
||
except KeyboardInterrupt:
|
||
logger.info("Shutting down the program due to KeyboardInterrupt.")
|
||
finally:
|
||
client.disconnect() # Ensure the connection is closed
|
||
logger.info("Program terminated.")
|