From f0b4017166229fa660f65100d2d2228fc51f020d Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 18 Nov 2024 14:17:31 +0100 Subject: [PATCH] refactored --- env | 47 ++++++------ src/box.py | 104 ++++++++++++++++++++++++++ src/config.py | 46 ++++++++++++ src/converters.py | 10 +++ src/main.py | 129 +++++++------------------------- src/message_processor.py | 156 --------------------------------------- 6 files changed, 211 insertions(+), 281 deletions(-) create mode 100644 src/box.py create mode 100644 src/config.py create mode 100644 src/converters.py delete mode 100644 src/message_processor.py diff --git a/env b/env index c61b5fd..c02945e 100644 --- a/env +++ b/env @@ -1,36 +1,37 @@ export MQTT_BROKER="172.23.1.102" export MQTT_PORT=1883 export MQTT_CLIENT_PREFIX="MyMQTTClient" -export MQTT_BOX_TOPIC_PREFIXES='{ - "high_temp": "heating/config/high_temp/", - "cmd": "heating/command/" +export BOX_TOPIC_PREFIXES='{ + "high_temp": "xheating/config/high_temp/", + "cmd": "xheating/command/" }' -export MQTT_CENTRAL_TOPICS='{ - "general_off": "heating/system/general_off", - "maintenance_mode": "heating/system/maintenance_mode", - "status": "heating/system/status" +export CENTRAL_TOPICS='{ + "general_off": "xheating/system/general_off", + "maintenance_mode": "xheating/system/maintenance_mode", + "status": "xheating/system/status" }' -export MQTT_STATUS_TOPIC="heating/status" +export CONTEXT_TOPIC_PREFIX='xheating/context/' +export STATUS_TOPIC="heating/status" export OFF_TEMPERATURE="5.0" export LOW_TEMPERATURE="15.0" export DEFAULT_HIGH_TEMPERATURE="21.0" export MAINTENANCE_TEMPERATURE="30.0" export BOXES='{ - "box1": { - "label": "living_room", - "windows": [ - { "topic": "window/living_room/street_side", "label": "street_side" }, - { "topic": "window/living_room/garden_side", "label": "garden_side" } - ], - "output_topic": "output/living_room" + "living_room": { + "windows": { + "street_side": { "topic": "window/living_room/street_side", "converter": "max" }, + "garden_side": { "topic": "window/living_room/garden_side", "converter": "max" } + }, + "output_converter": "max", + "output_topic": "xoutput/living_room" }, - "box2": { - "label": "kitchen", - "windows": [ - { "topic": "window/kitchen/street_side", "label": "street_side" }, - { "topic": "window/kitchen/garden_side", "label": "garden_side" }, - { "topic": "window/kitchen/garden_door", "label": "garden_door" } - ], - "output_topic": "output/kitchen" + "kitchen": { + "windows": { + "street_side": { "topic": "window/kitchen/street_side", "converter": "max" }, + "garden_side": { "topic": "window/kitchen/garden_side", "converter": "max" }, + "garden_door": { "topic": "window/kitchen/garden_door", "converter": "max" } + }, + "output_converter": "max", + "output_topic": "xoutput/kitchen" } }' diff --git a/src/box.py b/src/box.py new file mode 100644 index 0000000..7d1bb98 --- /dev/null +++ b/src/box.py @@ -0,0 +1,104 @@ +from loguru import logger +import json +from converters import CONVERTERS +from dataclasses import dataclass, field + +@dataclass(init=True, kw_only=True) +class Context: + high_temperature: str + general_off: bool = field(default=False) + maintenance_mode: bool = field(default=False) + overwrite_window: bool = field(default=False) + window_state: dict = field(default_factory=dict) + mode: str = field(default='off') + output_temperature: str = field(default='0') + +class Box: + def __init__(self, box_id, box_config, config): + logger.info(f"[Box {box_id}] Instantiating") + + self.id = box_id + self.windows = box_config['windows'] + self.output_converter = box_config['output_converter'] + self.output_topic = box_config['output_topic'] + self.config = config + + self.context = Context(high_temperature=config.DEFAULT_HIGH_TEMPERATURE, + output_temperature=config.DEFAULT_HIGH_TEMPERATURE, + mode='high', + window_state={ k: 'closed' for k in self.windows.keys() }) + + self.mqtt_client = None + + logger.info(f"[Box {box_id}] Instantiated, context is {self.context}") + + def _calculate_output_temperature(self): + # maintenance_mode has the highest priority, even higher than general_off + if self.context.maintenance_mode: + self.context.output_temperature = self.config.MAINTENANCE_TEMPERATURE + return + # general_off has the next highest priority + if self.context.general_off: + self.context.output_temperature = self.config.OFF_TEMPERATURE + return + # an open window shuts off the heating + if not self.context.overwrite_window: + for v in self.context.window_state.values(): + if v == 'open': + self.context.output_temperature = self.config.OFF_TEMPERATURE + return + # finally evaluate the mode + if self.context.mode == 'off': + self.context.output_temperature = self.config.OFF_TEMPERATURE + return + if self.context.mode == 'low': + self.context.output_temperature = self.config.LOW_TEMPERATURE + return + if self.context.mode == 'high': + self.context.output_temperature = self.context.high_temperature + return + # if we come here, something serious happened + logger.error(f"Error in calculation of output_temperature: {self.context=}") + return + + + def handle_message(self, topic_key, payload): + logger.info(f"[Box {self.id}] Handle message for '{topic_key}': {payload}") + + try: + # match topic to find operation to be executed + match topic_key.split('/'): + case [ primary_key, sub_key ] if primary_key == 'window': + self.context.window_state[sub_key] = CONVERTERS["window_contact_input"][self.windows[sub_key]["converter"]](payload) + case [ primary_key ] if primary_key == 'high_temp': + self.context.high_temperature = payload + case [ primary_key ] if primary_key == 'cmd': + if payload in ('high', 'low', 'off'): + self.context.mode = payload + else: + raise Exception(f"Invalid cmd '{payload}'") + case [ primary_key ] if primary_key == 'overwrite_window': + self.context.overwrite_window = payload.lower() == 'true' + case [ primary_key ] if primary_key == 'general_off': + self.context.general_off = payload.lower() == 'true' + case [ primary_key ] if primary_key == 'maintenance_mode': + self.context.maintenance_mode = payload.lower() == 'true' + case [ primary_key ] if primary_key == 'status': + pass + case _: + raise Error(f"Unexcepted topic_key: {topic_key}, {payload}") + + # calculate output temperature from context + self._calculate_output_temperature() + + + # publish output temperature + result_message = CONVERTERS["target_temperature_output"][self.output_converter](self.context.output_temperature) + publish_topic = self.output_topic + self.mqtt_client.publish(publish_topic, result_message) + logger.info(f"[Box {self.id}] Result published on '{publish_topic}': {result_message}") + + context_topic = f"{self.config.CONTEXT_TOPIC_PREFIX}{self.id}" + self.mqtt_client.publish(context_topic, repr(self.context)) + except Exception as e: + logger.error(f"[Box {self.id}] Error processing '{topic_key}': {e}") diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..af7eeee --- /dev/null +++ b/src/config.py @@ -0,0 +1,46 @@ +import os +import json +from dataclasses import dataclass, fields +from typing import Any + +class ConfigException (Exception): pass + +def default_env_loader(var_name, default_value=None): + v = os.getenv(var_name, default_value) + if not v: + raise ConfigException(var_name) + return v + +def json_env_loader(var_name): + v = default_env_loader(var_name) + vv = json.loads(v) + return vv + +@dataclass(init=False, frozen=False) +class Config: + MQTT_BROKER : str + MQTT_PORT : int + MQTT_CLIENT_PREFIX : str + BOXES : dict + BOX_TOPIC_PREFIXES : dict + CENTRAL_TOPICS : dict + OFF_TEMPERATURE : str + LOW_TEMPERATURE : str + MAINTENANCE_TEMPERATURE : str + DEFAULT_HIGH_TEMPERATURE : str + STATUS_TOPIC : str + CONTEXT_TOPIC_PREFIX : str + + def __init__(self): + for f in fields(self): + v = os.getenv(f.name) + if not v: + raise ConfigException(f.name) + if f.type == int: + v = int(v) + elif f.type == dict: + v = json.loads(v) + setattr(self, f.name, v) + + + diff --git a/src/converters.py b/src/converters.py new file mode 100644 index 0000000..377bc96 --- /dev/null +++ b/src/converters.py @@ -0,0 +1,10 @@ +CONVERTERS = { + "target_temperature_output": { + "max": lambda x: x, + "brennenstuhl": lambda x: json.dumps({"current_heating_setpoint":x}), + }, + "window_contact_input": { + "max": lambda x: 'closed' if (x.lower() in ('false', 'close', 'closed')) else 'open', + "aqara": lambda x: 'closed' if json.loads(x)["contact"] else 'open', + } +} diff --git a/src/main.py b/src/main.py index 08463b0..c62f2e9 100644 --- a/src/main.py +++ b/src/main.py @@ -1,95 +1,23 @@ -import os import sys import json import uuid import signal from loguru import logger import paho.mqtt.client as mqtt -from message_processor import process_message, prepare_context + +from box import Box +from config import Config -# MQTT configuration parameters -BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable -PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set -BOXES_CONFIG = os.getenv("BOXES") # JSON configuration of boxes from environment variable -BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes -CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics -OFF_TEMPERATURE = os.getenv("OFF_TEMPERATURE", "5.0") -LOW_TEMPERATURE = os.getenv("LOW_TEMPERATURE", "15.0") -DEFAULT_HIGH_TEMPERATURE = os.getenv("DEFAULT_HIGH_TEMPERATURE", "21.0") -MAINTENANCE_TEMPERATURE = os.getenv("MAINTENANCE_TEMPERATURE", "30.0") -STATUS_TOPIC = os.getenv("MQTT_STATUS_TOPIC") -CONTEXT_TOPIC_PREFIX = os.getenv("MQTT_CONTEXT_TOPIC_PREFIX") +config = Config() -# Check if required environment variables are set -missing_vars = [] -if not BROKER: - missing_vars.append('MQTT_BROKER') -if not BOXES_CONFIG: - missing_vars.append('MQTT_BOXES') -if not BOX_TOPIC_PREFIXES_CONFIG: - missing_vars.append('MQTT_BOX_TOPIC_PREFIXES') -if not CENTRAL_TOPICS_CONFIG: - missing_vars.append('MQTT_CENTRAL_TOPICS') -if not STATUS_TOPIC: - missing_vars.append('MQTT_STATUS_TOPIC') -if not CONTEXT_TOPIC_PREFIX: - missing_vars.append('MQTT_CONTEXT_TOPIC_PREFIX') - -if missing_vars: - logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}") - sys.exit(1) - -# context for box operations -context = {} - -# configuration values for boxes -context['off_temperature'] = OFF_TEMPERATURE -context['low_temperature'] = LOW_TEMPERATURE -context['default_high_temperature'] = DEFAULT_HIGH_TEMPERATURE -context['maintenance_temperature'] = MAINTENANCE_TEMPERATURE -context['status_topic'] = STATUS_TOPIC -context['context_topic_prefix'] = CONTEXT_TOPIC_PREFIX - -# Load box configurations from JSON -try: - boxes = json.loads(BOXES_CONFIG) - - # boxes structure added to global context to give process_message access to it - context['boxes'] = boxes - - # add context dict to each box in the list - for box_name, config in boxes.items(): - config['context'] = prepare_context(box_name, context) -except json.JSONDecodeError as e: - logger.error(f"Error parsing JSON configuration for boxes: {e}") - sys.exit(1) - -# Load box-specific topic prefixes from JSON -try: - box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG) - - # Validation: Check if the required keys are present - required_keys = {'high_temp', 'cmd', 'overwrite_window'} - missing_keys = required_keys - box_topic_prefixes.keys() - - if missing_keys: - raise ValueError(f"Error: The following keys are missing in MQTT_BOX_TOPIC_PREFIXES: {', '.join(missing_keys)}") -except (json.JSONDecodeError, ValueError) as e: - logger.error(str(e)) - sys.exit(1) - -# Load central topics from JSON -try: - central_topics = json.loads(CENTRAL_TOPICS_CONFIG) -except json.JSONDecodeError as e: - logger.error(f"Error parsing JSON configuration for central topics: {e}") - sys.exit(1) +boxes = [] +for k, v in config.BOXES.items(): + boxes.append(Box(k, v, config)) # Generate CLIENT_ID from UUID and optional prefix -CLIENT_PREFIX = os.getenv("MQTT_CLIENT_PREFIX", "MQTTClient") -CLIENT_ID = f"{CLIENT_PREFIX}_{uuid.uuid4()}" +CLIENT_ID = f"{config.MQTT_CLIENT_PREFIX}_{uuid.uuid4()}" # Mapping of topics to boxes and topic keys for efficient lookup topic_mapping = {} @@ -100,30 +28,25 @@ def on_connect(client, userdata, flags, reason_code, properties): logger.info("Connected to the broker") # Subscribe to dynamically generated topics for each box and create mappings - for box_name, config in boxes.items(): - label = config.get("label") - if not label: - logger.error(f"[{box_name}] No 'label' defined.") - continue - + for box in boxes: + label = box.id # Generate topics based on configured box-specific prefixes and box label - for topic_key, prefix in box_topic_prefixes.items(): + for topic_key, prefix in config.BOX_TOPIC_PREFIXES.items(): topic = f"{prefix}{label}" client.subscribe(topic) - topic_mapping[topic] = (box_name, topic_key) - logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')") + topic_mapping[topic] = (box, topic_key) + logger.info(f"[{box.id}] Subscribed to '{topic}' (Key: '{topic_key}')") # Subscribe window topics from box - for window_topic in config.get("windows"): - topic = window_topic.get("topic") - label = window_topic.get("label") + for label, window in box.windows.items(): + topic = window['topic'] topic_key = f"window/{label}" client.subscribe(topic) - topic_mapping[topic] = (box_name, topic_key) - logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')") + topic_mapping[topic] = (box, topic_key) + logger.info(f"[{box.id}] Subscribed to '{topic}' (Key: '{topic_key}')") # Subscribe to central topics and create mappings - for central_key, central_topic in central_topics.items(): + for central_key, central_topic in config.CENTRAL_TOPICS.items(): client.subscribe(central_topic) # Mark central topics with a special key to identify them topic_mapping[central_topic] = ("__central__", central_key) @@ -138,15 +61,16 @@ def on_message(client, userdata, msg): payload = msg.payload.decode() if topic in topic_mapping: - box_name, topic_key = topic_mapping[topic] - if box_name == "__central__": + box, topic_key = topic_mapping[topic] + if box == "__central__": # Central message, process for all boxes logger.info(f"[Central] Processing central message for '{topic_key}': {payload}") - for current_box_name in boxes.keys(): - process_message(current_box_name, topic_key, payload, context) + for b in boxes: + b.handle_message(topic_key, payload) else: # Box-specific message - process_message(box_name, topic_key, payload, context) + logger.info(f"[{box.id}] Processing box-specific message for ‘{topic_key}': {payload}") + box.handle_message(topic_key, payload) else: logger.warning(f"Received unknown topic: '{topic}'") except Exception as e: @@ -166,8 +90,9 @@ def handle_exit_signal(signum, frame): # Initialize the MQTT client and configure callbacks client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=CLIENT_ID, ) +for box in boxes: + box.mqtt_client = client -context['client'] = client client.on_connect = on_connect client.on_message = on_message @@ -182,7 +107,7 @@ signal.signal(signal.SIGINT, handle_exit_signal) # Connect to the broker try: - client.connect(BROKER, PORT, keepalive=60) + client.connect(config.MQTT_BROKER, config.MQTT_PORT, keepalive=60) except Exception as e: logger.error(f"Failed to connect to the broker: {e}") sys.exit(1) diff --git a/src/message_processor.py b/src/message_processor.py deleted file mode 100644 index 793d8db..0000000 --- a/src/message_processor.py +++ /dev/null @@ -1,156 +0,0 @@ -from loguru import logger -import json - - - -CONVERTERS = { - "target_temperature_output": { - "max": lambda x: x["output_temperature"], - "brennenstuhl": lambda x: json.dumps({"current_heating_setpoint":x["output_temperature"]}), - }, - "window_contact_input": { - "max": lambda x: 'closed' if (x.lower() in ('false', 'close', 'closed')) else 'open', - "aqara": lambda x: 'closed' if json.loads(x)["contact"] else 'open', - } -} - -# context -# boxes: structure of boxes -# client: MQTT client -# -# boxes['box_name']['context'] -# store here what ever is require to represent the state of the box - -def prepare_context(box_name, context): - local_context = {} - - local_context['id'] = box_name - local_context['label'] = context['boxes'][box_name]['label'] - local_context['high_temperature'] = context['default_high_temperature'] - local_context['low_temperature'] = context['low_temperature'] - local_context['off_temperature'] = context['off_temperature'] - local_context['maintenance_temperature'] = context['maintenance_temperature'] - - local_context['general_off'] = False - local_context['maintenance_mode'] = False - local_context['overwrite_window'] = False - - local_context['window_state'] = {} - for w in context['boxes'][box_name]['windows']: - local_context['window_state'][w['label']] = 'closed' - - local_context['mode'] = 'high' - - local_context['output_temperature'] = local_context['high_temperature'] - - return local_context - -def process_message(box_name, topic_key, payload, context): - try: - box = context['boxes'][box_name] - local_context = box['context'] - logger.info(f"[{box_name}] Local context before: {local_context}") - logger.info(f"[{box_name}] Processing message for '{topic_key}': {payload}") - - match topic_key.split('/'): - case [ primary_key, sub_key ] if primary_key == 'window': - result = process_window(box_name, context, local_context, sub_key, payload) - case [ primary_key ] if primary_key == 'high_temp': - result = process_high_temp(box_name, context, local_context, payload) - case [ primary_key ] if primary_key == 'cmd': - result = process_cmd(box_name, context, local_context, payload) - case [ primary_key ] if primary_key == 'overwrite_window': - result = process_overwrite_window(box_name, context, local_context, payload) - case [ primary_key ] if primary_key == 'general_off': - result = process_general_off(box_name, context, local_context, payload) - case [ primary_key ] if primary_key == 'maintenance_mode': - result = process_maintenance_mode(box_name, context, local_context, payload) - case [ primary_key ] if primary_key == 'status': - result = process_status(box_name, context, local_context, payload) - case _: - raise Error(f"Unexcepted topic_key: {topic_key}, {payload}") - - if result: - result_message = CONVERTERS["target_temperature_output"][box["output_converter"]](local_context) - publish_topic = box["output_topic"] - context['client'].publish(publish_topic, result_message) - logger.info(f"[{box_name}] Result published on '{publish_topic}': {result_message}") - - context_topic = f"{context['context_topic_prefix']}{box['label']}" - context['client'].publish(context_topic, json.dumps(local_context)) - - logger.info(f"[{box_name}] Local context after: {local_context}") - except Exception as e: - logger.error(f"[{box_name}] Error processing '{topic_key}': {e}") - -def _calculate_output_temperature(local_context): - # maintenance_mode has the highest priority, even higher than general_off - if local_context['maintenance_mode']: - local_context['output_temperature'] = local_context['maintenance_temperature'] - return - # general_off has the next highest priority - if local_context['general_off']: - local_context['output_temperature'] = local_context['off_temperature'] - return - # an open window shuts off the heating - if not local_context['overwrite_window']: - for w in local_context['window_state'].values(): - if w == 'open': - local_context['output_temperature'] = local_context['off_temperature'] - return - # finally evaluate the mode - if local_context['mode'] == 'off': - local_context['output_temperature'] = local_context['off_temperature'] - return - if local_context['mode'] == 'low': - local_context['output_temperature'] = local_context['low_temperature'] - return - if local_context['mode'] == 'high': - local_context['output_temperature'] = local_context['high_temperature'] - return - # if we come here, something serious happened - logger.error(f"Error in calculation of output_temperature: {local_context=}") - return - -def process_status(box_name, context, local_context, payload): - return False - -def process_general_off(box_name, context, local_context, payload): - local_context['general_off'] = (payload.lower() in ('true')) - _calculate_output_temperature(local_context) - return True - -def process_maintenance_mode(box_name, context, local_context, payload): - local_context['maintenance_mode'] = (payload.lower() in ('true')) - _calculate_output_temperature(local_context) - return True - -def process_cmd(box_name, context, local_context, payload): - if payload.lower() in ('high', 'low', 'off'): - local_context['mode'] = payload.lower() - _calculate_output_temperature(local_context) - else: - logger.error(f"Invalid cmd for {box_name} received: {payload}") - return True - -def process_overwrite_window(box_name, context, local_context, payload): - local_context['overwrite_window'] = (payload.lower() in ('true')) - _calculate_output_temperature(local_context) - return True - -def process_high_temp(box_name, context, local_context, payload): - local_context['high_temperature'] = payload - _calculate_output_temperature(local_context) - return True - -def process_window(box_name, context, local_context, sub_key, payload): - # default converter - converter = lambda x:x - for sk in context["boxes"][box_name]["windows"]: - if sk["label"] == sub_key: - converter = CONVERTERS["window_contact_input"][sk["converter"]] - break - local_context['window_state'][sub_key] = converter(payload) - _calculate_output_temperature(local_context) - return True -