refactored
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

This commit is contained in:
2024-11-18 14:17:31 +01:00
parent 67ca83983b
commit f0b4017166
6 changed files with 211 additions and 281 deletions

View File

@ -1,95 +1,23 @@
import os
import sys
import json
import uuid
import signal
from loguru import logger
import paho.mqtt.client as mqtt
from message_processor import process_message, prepare_context
from box import Box
from config import Config
# MQTT configuration parameters
BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable
PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set
BOXES_CONFIG = os.getenv("BOXES") # JSON configuration of boxes from environment variable
BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes
CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics
OFF_TEMPERATURE = os.getenv("OFF_TEMPERATURE", "5.0")
LOW_TEMPERATURE = os.getenv("LOW_TEMPERATURE", "15.0")
DEFAULT_HIGH_TEMPERATURE = os.getenv("DEFAULT_HIGH_TEMPERATURE", "21.0")
MAINTENANCE_TEMPERATURE = os.getenv("MAINTENANCE_TEMPERATURE", "30.0")
STATUS_TOPIC = os.getenv("MQTT_STATUS_TOPIC")
CONTEXT_TOPIC_PREFIX = os.getenv("MQTT_CONTEXT_TOPIC_PREFIX")
config = Config()
# Check if required environment variables are set
missing_vars = []
if not BROKER:
missing_vars.append('MQTT_BROKER')
if not BOXES_CONFIG:
missing_vars.append('MQTT_BOXES')
if not BOX_TOPIC_PREFIXES_CONFIG:
missing_vars.append('MQTT_BOX_TOPIC_PREFIXES')
if not CENTRAL_TOPICS_CONFIG:
missing_vars.append('MQTT_CENTRAL_TOPICS')
if not STATUS_TOPIC:
missing_vars.append('MQTT_STATUS_TOPIC')
if not CONTEXT_TOPIC_PREFIX:
missing_vars.append('MQTT_CONTEXT_TOPIC_PREFIX')
if missing_vars:
logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}")
sys.exit(1)
# context for box operations
context = {}
# configuration values for boxes
context['off_temperature'] = OFF_TEMPERATURE
context['low_temperature'] = LOW_TEMPERATURE
context['default_high_temperature'] = DEFAULT_HIGH_TEMPERATURE
context['maintenance_temperature'] = MAINTENANCE_TEMPERATURE
context['status_topic'] = STATUS_TOPIC
context['context_topic_prefix'] = CONTEXT_TOPIC_PREFIX
# Load box configurations from JSON
try:
boxes = json.loads(BOXES_CONFIG)
# boxes structure added to global context to give process_message access to it
context['boxes'] = boxes
# add context dict to each box in the list
for box_name, config in boxes.items():
config['context'] = prepare_context(box_name, context)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON configuration for boxes: {e}")
sys.exit(1)
# Load box-specific topic prefixes from JSON
try:
box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG)
# Validation: Check if the required keys are present
required_keys = {'high_temp', 'cmd', 'overwrite_window'}
missing_keys = required_keys - box_topic_prefixes.keys()
if missing_keys:
raise ValueError(f"Error: The following keys are missing in MQTT_BOX_TOPIC_PREFIXES: {', '.join(missing_keys)}")
except (json.JSONDecodeError, ValueError) as e:
logger.error(str(e))
sys.exit(1)
# Load central topics from JSON
try:
central_topics = json.loads(CENTRAL_TOPICS_CONFIG)
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON configuration for central topics: {e}")
sys.exit(1)
boxes = []
for k, v in config.BOXES.items():
boxes.append(Box(k, v, config))
# Generate CLIENT_ID from UUID and optional prefix
CLIENT_PREFIX = os.getenv("MQTT_CLIENT_PREFIX", "MQTTClient")
CLIENT_ID = f"{CLIENT_PREFIX}_{uuid.uuid4()}"
CLIENT_ID = f"{config.MQTT_CLIENT_PREFIX}_{uuid.uuid4()}"
# Mapping of topics to boxes and topic keys for efficient lookup
topic_mapping = {}
@ -100,30 +28,25 @@ def on_connect(client, userdata, flags, reason_code, properties):
logger.info("Connected to the broker")
# Subscribe to dynamically generated topics for each box and create mappings
for box_name, config in boxes.items():
label = config.get("label")
if not label:
logger.error(f"[{box_name}] No 'label' defined.")
continue
for box in boxes:
label = box.id
# Generate topics based on configured box-specific prefixes and box label
for topic_key, prefix in box_topic_prefixes.items():
for topic_key, prefix in config.BOX_TOPIC_PREFIXES.items():
topic = f"{prefix}{label}"
client.subscribe(topic)
topic_mapping[topic] = (box_name, topic_key)
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
topic_mapping[topic] = (box, topic_key)
logger.info(f"[{box.id}] Subscribed to '{topic}' (Key: '{topic_key}')")
# Subscribe window topics from box
for window_topic in config.get("windows"):
topic = window_topic.get("topic")
label = window_topic.get("label")
for label, window in box.windows.items():
topic = window['topic']
topic_key = f"window/{label}"
client.subscribe(topic)
topic_mapping[topic] = (box_name, topic_key)
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
topic_mapping[topic] = (box, topic_key)
logger.info(f"[{box.id}] Subscribed to '{topic}' (Key: '{topic_key}')")
# Subscribe to central topics and create mappings
for central_key, central_topic in central_topics.items():
for central_key, central_topic in config.CENTRAL_TOPICS.items():
client.subscribe(central_topic)
# Mark central topics with a special key to identify them
topic_mapping[central_topic] = ("__central__", central_key)
@ -138,15 +61,16 @@ def on_message(client, userdata, msg):
payload = msg.payload.decode()
if topic in topic_mapping:
box_name, topic_key = topic_mapping[topic]
if box_name == "__central__":
box, topic_key = topic_mapping[topic]
if box == "__central__":
# Central message, process for all boxes
logger.info(f"[Central] Processing central message for '{topic_key}': {payload}")
for current_box_name in boxes.keys():
process_message(current_box_name, topic_key, payload, context)
for b in boxes:
b.handle_message(topic_key, payload)
else:
# Box-specific message
process_message(box_name, topic_key, payload, context)
logger.info(f"[{box.id}] Processing box-specific message for {topic_key}': {payload}")
box.handle_message(topic_key, payload)
else:
logger.warning(f"Received unknown topic: '{topic}'")
except Exception as e:
@ -166,8 +90,9 @@ def handle_exit_signal(signum, frame):
# Initialize the MQTT client and configure callbacks
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID, )
for box in boxes:
box.mqtt_client = client
context['client'] = client
client.on_connect = on_connect
client.on_message = on_message
@ -182,7 +107,7 @@ signal.signal(signal.SIGINT, handle_exit_signal)
# Connect to the broker
try:
client.connect(BROKER, PORT, keepalive=60)
client.connect(config.MQTT_BROKER, config.MQTT_PORT, keepalive=60)
except Exception as e:
logger.error(f"Failed to connect to the broker: {e}")
sys.exit(1)