there is an issue concerning output_topic
This commit is contained in:
36
env
36
env
@ -1,17 +1,7 @@
|
|||||||
export MQTT_BROKER="172.23.1.102"
|
export MQTT_BROKER="172.23.1.102"
|
||||||
export MQTT_PORT=1883
|
export MQTT_PORT=1883
|
||||||
export MQTT_BOXES='{
|
export MQTT_CLIENT_PREFIX="MyMQTTClient"
|
||||||
"box1": {
|
|
||||||
"label": "living_room",
|
|
||||||
"windows": [ "street_side", "garden_side" ]
|
|
||||||
},
|
|
||||||
"box2": {
|
|
||||||
"label": "kitchen",
|
|
||||||
"windows": [ "street_side", "garden_side", "garden_door" ]
|
|
||||||
}
|
|
||||||
}'
|
|
||||||
export MQTT_BOX_TOPIC_PREFIXES='{
|
export MQTT_BOX_TOPIC_PREFIXES='{
|
||||||
"window": "heating/sensor/window/",
|
|
||||||
"high_temp": "heating/config/high_temp/",
|
"high_temp": "heating/config/high_temp/",
|
||||||
"cmd": "heating/command/"
|
"cmd": "heating/command/"
|
||||||
}'
|
}'
|
||||||
@ -19,5 +9,25 @@ export MQTT_CENTRAL_TOPICS='{
|
|||||||
"general_off": "heating/system/general_off",
|
"general_off": "heating/system/general_off",
|
||||||
"maintenance_mode": "heating/system/maintenance_mode"
|
"maintenance_mode": "heating/system/maintenance_mode"
|
||||||
}'
|
}'
|
||||||
export MQTT_CLIENT_PREFIX="MyMQTTClient"
|
export OFF_TEMPERATURE="5.0"
|
||||||
export MQTT_PUBLISH_PREFIX="output/"
|
export LOW_TEMPERATURE="15.0"
|
||||||
|
export MAINTENANCE_TEMPERATURE="30.0"
|
||||||
|
export BOXES='{
|
||||||
|
"box1": {
|
||||||
|
"label": "living_room",
|
||||||
|
"windows": [
|
||||||
|
{ "topic": "street_side_topic", "label": "street_side" },
|
||||||
|
{ "topic": "garden_side_topic", "label": "garden_side" }
|
||||||
|
],
|
||||||
|
"output_topic": "output/living_room"
|
||||||
|
},
|
||||||
|
"box2": {
|
||||||
|
"label": "kitchen",
|
||||||
|
"windows": [
|
||||||
|
{ "topic": "street_side_topic", "label": "street_side" },
|
||||||
|
{ "topic": "garden_side_topic", "label": "garden_side" },
|
||||||
|
{ "topic": "garden_door_topic", "label": "garden_door" }
|
||||||
|
],
|
||||||
|
"output_topic": "output/kitchen"
|
||||||
|
}
|
||||||
|
}'
|
||||||
|
48
src/main.py
48
src/main.py
@ -11,10 +11,12 @@ from message_processor import process_message # Import the moved function
|
|||||||
# MQTT configuration parameters
|
# MQTT configuration parameters
|
||||||
BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable
|
BROKER = os.getenv("MQTT_BROKER") # Read broker from environment variable
|
||||||
PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set
|
PORT = int(os.getenv("MQTT_PORT", 1883)) # Default port if not set
|
||||||
BOXES_CONFIG = os.getenv("MQTT_BOXES") # JSON configuration of boxes from environment variable
|
BOXES_CONFIG = os.getenv("BOXES") # JSON configuration of boxes from environment variable
|
||||||
BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes
|
BOX_TOPIC_PREFIXES_CONFIG = os.getenv("MQTT_BOX_TOPIC_PREFIXES") # JSON configuration of box-specific topic prefixes
|
||||||
CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics
|
CENTRAL_TOPICS_CONFIG = os.getenv("MQTT_CENTRAL_TOPICS") # JSON configuration of central topics
|
||||||
PUBLISH_PREFIX = os.getenv("MQTT_PUBLISH_PREFIX", "output/") # Globales Publish-Präfix
|
OFF_TEMPERATURE = os.getenv("OFF_TEMPERATURE", "5.0")
|
||||||
|
LOW_TEMPERATURE = os.getenv("LOW_TEMPERATURE", "15.0")
|
||||||
|
MAINTENANCE_TEMPERATURE = os.getenv("MAINTENANCE_TEMPERATURE", "30.0")
|
||||||
|
|
||||||
# Check if required environment variables are set
|
# Check if required environment variables are set
|
||||||
missing_vars = []
|
missing_vars = []
|
||||||
@ -26,8 +28,6 @@ if not BOX_TOPIC_PREFIXES_CONFIG:
|
|||||||
missing_vars.append('MQTT_BOX_TOPIC_PREFIXES')
|
missing_vars.append('MQTT_BOX_TOPIC_PREFIXES')
|
||||||
if not CENTRAL_TOPICS_CONFIG:
|
if not CENTRAL_TOPICS_CONFIG:
|
||||||
missing_vars.append('MQTT_CENTRAL_TOPICS')
|
missing_vars.append('MQTT_CENTRAL_TOPICS')
|
||||||
if not PUBLISH_PREFIX:
|
|
||||||
missing_vars.append('MQTT_PUBLISH_PREFIX')
|
|
||||||
|
|
||||||
if missing_vars:
|
if missing_vars:
|
||||||
logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}")
|
logger.error(f"Error: The following environment variables are not set: {', '.join(missing_vars)}")
|
||||||
@ -36,8 +36,10 @@ if missing_vars:
|
|||||||
# context for box operations
|
# context for box operations
|
||||||
context = {}
|
context = {}
|
||||||
|
|
||||||
# publish topic prefix for boxes
|
# configuration values for boxes
|
||||||
context['publish'] = PUBLISH_PREFIX
|
context['off_temperature'] = OFF_TEMPERATURE
|
||||||
|
context['low_temperature'] = LOW_TEMPERATURE
|
||||||
|
context['maintenance_temperature'] = MAINTENANCE_TEMPERATURE
|
||||||
|
|
||||||
# Load box configurations from JSON
|
# Load box configurations from JSON
|
||||||
try:
|
try:
|
||||||
@ -49,6 +51,8 @@ try:
|
|||||||
# add context dict to each box in the list
|
# add context dict to each box in the list
|
||||||
for box_name, config in boxes.items():
|
for box_name, config in boxes.items():
|
||||||
config['context'] = {}
|
config['context'] = {}
|
||||||
|
|
||||||
|
logger.info(f"{boxes=}")
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error(f"Error parsing JSON configuration for boxes: {e}")
|
logger.error(f"Error parsing JSON configuration for boxes: {e}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
@ -58,7 +62,7 @@ try:
|
|||||||
box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG)
|
box_topic_prefixes = json.loads(BOX_TOPIC_PREFIXES_CONFIG)
|
||||||
|
|
||||||
# Validation: Check if the required keys are present
|
# Validation: Check if the required keys are present
|
||||||
required_keys = {'window', 'high_temp', 'cmd'}
|
required_keys = {'high_temp', 'cmd'}
|
||||||
missing_keys = required_keys - box_topic_prefixes.keys()
|
missing_keys = required_keys - box_topic_prefixes.keys()
|
||||||
|
|
||||||
if missing_keys:
|
if missing_keys:
|
||||||
@ -96,23 +100,19 @@ def on_connect(client, userdata, flags, rc):
|
|||||||
|
|
||||||
# Generate topics based on configured box-specific prefixes and box label
|
# Generate topics based on configured box-specific prefixes and box label
|
||||||
for topic_key, prefix in box_topic_prefixes.items():
|
for topic_key, prefix in box_topic_prefixes.items():
|
||||||
if topic_key == 'window':
|
topic = f"{prefix}{label}"
|
||||||
windows = config.get("windows", [])
|
client.subscribe(topic)
|
||||||
if not windows:
|
topic_mapping[topic] = (box_name, topic_key)
|
||||||
logger.warning(f"[{box_name}] No 'windows' defined.")
|
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
||||||
continue
|
|
||||||
for window in windows:
|
# Subscribe window topics from box
|
||||||
# Form the new topic_key by combining 'window' and the window name
|
for window_topic in config.get("windows"):
|
||||||
enhanced_topic_key = f"{topic_key}/{window}"
|
topic = window_topic.get("topic")
|
||||||
topic = f"{prefix}{label}/{window}"
|
label = window_topic.get("label")
|
||||||
client.subscribe(topic)
|
topic_key = f"window/{label}"
|
||||||
topic_mapping[topic] = (box_name, enhanced_topic_key)
|
client.subscribe(topic)
|
||||||
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{enhanced_topic_key}')")
|
topic_mapping[topic] = (box_name, topic_key)
|
||||||
else:
|
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
||||||
topic = f"{prefix}{label}"
|
|
||||||
client.subscribe(topic)
|
|
||||||
topic_mapping[topic] = (box_name, topic_key)
|
|
||||||
logger.info(f"[{box_name}] Subscribed to '{topic}' (Key: '{topic_key}')")
|
|
||||||
|
|
||||||
# Subscribe to central topics and create mappings
|
# Subscribe to central topics and create mappings
|
||||||
for central_key, central_topic in central_topics.items():
|
for central_key, central_topic in central_topics.items():
|
||||||
|
@ -9,29 +9,43 @@ from loguru import logger
|
|||||||
# store here what ever is require to represent the state of the box
|
# store here what ever is require to represent the state of the box
|
||||||
|
|
||||||
def process_message(box_name, topic_key, payload, context):
|
def process_message(box_name, topic_key, payload, context):
|
||||||
|
logger.info(f"{context=}")
|
||||||
try:
|
try:
|
||||||
logger.info(f"[{box_name}] Processing message for '{topic_key}': {payload}")
|
logger.info(f"[{box_name}] Processing message for '{topic_key}': {payload}")
|
||||||
|
|
||||||
match topic_key.split('/'):
|
match topic_key.split('/'):
|
||||||
case [ primary_key, sub_key ] if primary_key == 'window':
|
case [ primary_key, sub_key ] if primary_key == 'window':
|
||||||
pass
|
result = process_window(box_name, context, sub_key, payload)
|
||||||
case [ primary_key ] if primary_key == 'high_temp':
|
case [ primary_key ] if primary_key == 'high_temp':
|
||||||
pass
|
result = process_high_temp(box_name, context, payload)
|
||||||
case [ primary_key ] if primary_key == 'cmd':
|
case [ primary_key ] if primary_key == 'cmd':
|
||||||
pass
|
result = process_cmd(box_name, context, payload)
|
||||||
case [ primary_key ] if primary_key == 'general_off':
|
case [ primary_key ] if primary_key == 'general_off':
|
||||||
pass
|
result = process_general_off(box_name, context, payload)
|
||||||
case [ primary_key ] if primary_key == 'maintenance_mode':
|
case [ primary_key ] if primary_key == 'maintenance_mode':
|
||||||
pass
|
result = process_maintenance_mode(box_name, context, payload)
|
||||||
case _:
|
case _:
|
||||||
raise Error(f"Unexcepted topic_key: {topic_key}, {payload}")
|
raise Error(f"Unexcepted topic_key: {topic_key}, {payload}")
|
||||||
|
|
||||||
# Example processing: Implement your specific logic here
|
if result:
|
||||||
# For example, perform calculations or store data.
|
publish_topic = context["boxes"]["output_topic"]
|
||||||
result = f"Processed by {box_name} for '{topic_key}': {payload}"
|
context['client'].publish(publish_topic, result)
|
||||||
|
logger.info(f"[{box_name}] Result published on '{publish_topic}': {result}")
|
||||||
publish_topic = f"{context['publish']}/{box_name}"
|
|
||||||
context['client'].publish(publish_topic, result)
|
|
||||||
logger.info(f"[{box_name}] Result published on '{publish_topic}': {result}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[{box_name}] Error processing '{topic_key}': {e}")
|
logger.error(f"[{box_name}] Error processing '{topic_key}': {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def process_general_off(box_name, context, payload):
|
||||||
|
return "general off"
|
||||||
|
|
||||||
|
def process_maintenance_mode(box_name, context, payload):
|
||||||
|
return "maintenance mode"
|
||||||
|
|
||||||
|
def process_cmd(box_name, context, payload):
|
||||||
|
return f"cmd: {payload}"
|
||||||
|
|
||||||
|
def process_high_temp(box_name, context, payload):
|
||||||
|
return f"high_temp: {payload}"
|
||||||
|
|
||||||
|
def process_window(box_name, context, sub_key, payload):
|
||||||
|
return f"window: {sub_key}, {payload}"
|
||||||
|
Reference in New Issue
Block a user