From 9ba478c34d50646e593bfef009a338ad8dce6796 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 8 Dec 2025 15:37:03 +0100 Subject: [PATCH] seems to work --- .woodpecker/build.yml | 1 + .woodpecker/deploy.yml | 1 + apps/pulsegen/main.py | 98 ++++++++++++++++++++++++----- deployment/pulsegen-deployment.yaml | 51 +++++++++++++++ 4 files changed, 136 insertions(+), 15 deletions(-) create mode 100644 deployment/pulsegen-deployment.yaml diff --git a/.woodpecker/build.yml b/.woodpecker/build.yml index bb928fd..9a6406b 100644 --- a/.woodpecker/build.yml +++ b/.woodpecker/build.yml @@ -11,6 +11,7 @@ matrix: - abstraction - rules - static + - pulsegen - homekit steps: diff --git a/.woodpecker/deploy.yml b/.woodpecker/deploy.yml index fb504d4..5c4e44b 100644 --- a/.woodpecker/deploy.yml +++ b/.woodpecker/deploy.yml @@ -16,6 +16,7 @@ matrix: - abstraction - rules - static + - pulsegen steps: deploy-${APP}: diff --git a/apps/pulsegen/main.py b/apps/pulsegen/main.py index 11b084a..99a5406 100644 --- a/apps/pulsegen/main.py +++ b/apps/pulsegen/main.py @@ -10,12 +10,21 @@ from typing import Any from aiomqtt import Client, Message logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) +COIL_STATUS_PREFIX = "dt1/di" +COIL_STATUS_TOPIC = f"{COIL_STATUS_PREFIX}/+" +PULSEGEN_COMMAND_PREFIX = "pulsegen/command" +PULSEGEN_COMMAND_TOPIC = f"{PULSEGEN_COMMAND_PREFIX}/+/+" +COIL_COMMAND_PREFIX = "dt1/coil" +PULSEGEN_STATUS_PREFIX = "pulsegen/status" + +COIL_STATUS_CACHE: dict[int, bool] = {} + def get_mqtt_settings() -> tuple[str, int]: """Get MQTT broker settings from environment variables. @@ -28,23 +37,87 @@ def get_mqtt_settings() -> tuple[str, int]: return broker, port -async def handle_message(message: Message) -> None: +async def handle_message(message: Message, client: Client) -> None: """Handle incoming MQTT message. Args: message: MQTT message object + client: MQTT client instance """ try: payload = message.payload.decode() logger.info(f"Received message on {message.topic}: {payload}") - # Parse JSON payload if possible try: - data = json.loads(payload) - logger.debug(f"Parsed JSON: {data}") - # TODO: Process message based on topic and data - except json.JSONDecodeError: - logger.debug(f"Non-JSON payload: {payload}") + topic = str(message.topic) + + match topic.split("/"): + case [prefix, di, coil_id] if f"{prefix}/{di}" == COIL_STATUS_PREFIX: + try: + coil_num = int(coil_id) + except ValueError: + logger.debug(f"Invalid coil id in topic: {topic}") + return + + state = payload.lower() in ("1", "true", "on") + COIL_STATUS_CACHE[coil_num] = state + logger.info(f"Updated coil {coil_num} status to {state}") + + logger.info(f"Publishing pulsegen status for coil {coil_num}: {state}") + await client.publish( + topic=f"{PULSEGEN_STATUS_PREFIX}/{coil_num}", + payload="on" if state else "off", + qos=1, + retain=True, + ) + + case [prefix, command, coil_in_id, coil_out_id] if f"{prefix}/{command}" == PULSEGEN_COMMAND_PREFIX: + try: + coil_in_id = int(coil_in_id) + coil_out_id = int(coil_out_id) + except ValueError: + logger.debug(f"Invalid coil id in topic: {topic}") + return + + try: + coil_state = COIL_STATUS_CACHE[coil_in_id] + except KeyError: + logger.debug(f"Coil {coil_in_id} status unknown, cannot process command") + return + + cmd = payload.lower() in ("1", "true", "on") + + if cmd == coil_state: + logger.info(f"Coil {coil_in_id} already in desired state {cmd}, ignoring command") + return + + logger.info(f"Received pulsegen command on {topic}: {coil_in_id=}, {coil_out_id=}, {cmd=}") + + + coil_cmd_topic = f"{COIL_COMMAND_PREFIX}/{coil_out_id}" + + logger.info(f"Sending raising edge command: topic={coil_cmd_topic}") + await client.publish( + topic=coil_cmd_topic, + payload="1", + qos=1, + retain=False, + ) + + await asyncio.sleep(0.2) + + logger.info(f"Sending falling edge command: topic={coil_cmd_topic}") + await client.publish( + topic=coil_cmd_topic, + payload="0", + qos=1, + retain=False, + ) + + case _: + logger.debug(f"Ignoring message on unrelated topic: {topic}") + except Exception as e: + logger.debug(f"Exception when handling payload: {e}") except Exception as e: logger.error(f"Error handling message: {e}", exc_info=True) @@ -80,11 +153,6 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None: """ broker, port = get_mqtt_settings() - # Topics to subscribe to - subscribe_topics = [ - "pulsegen/command/+/+", - "dt1/di/+" - ] reconnect_interval = 5 # seconds @@ -100,7 +168,7 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None: logger.info("Connected to MQTT broker") # Subscribe to topics - for topic in subscribe_topics: + for topic in [PULSEGEN_COMMAND_TOPIC, COIL_STATUS_TOPIC]: await client.subscribe(topic) logger.info(f"Subscribed to {topic}") @@ -112,7 +180,7 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None: if shutdown_event.is_set(): break try: - await handle_message(message) + await handle_message(message, client) except Exception as e: logger.error(f"Error in message handler: {e}", exc_info=True) diff --git a/deployment/pulsegen-deployment.yaml b/deployment/pulsegen-deployment.yaml new file mode 100644 index 0000000..8ee96f1 --- /dev/null +++ b/deployment/pulsegen-deployment.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pulsegen + namespace: homea2 + labels: + app: pulsegen + component: home-automation +spec: + replicas: 1 + selector: + matchLabels: + app: pulsegen + template: + metadata: + annotations: + reloader.stakater.com/auto: "true" + configmap.reloader.stakater.com/reload: "home-automation-environment" + labels: + app: pulsegen + component: home-automation + spec: + containers: + - name: pulsegen + image: %IMAGE% + env: + - name: MQTT_BROKER + valueFrom: + configMapKeyRef: + name: home-automation-environment + key: SHARED_MQTT_BROKER + - name: MQTT_PORT + valueFrom: + configMapKeyRef: + name: home-automation-environment + key: SHARED_MQTT_PORT + resources: + limits: + cpu: 1000m + memory: 1Gi + requests: + cpu: 200m + memory: 256Mi + livenessProbe: + exec: + command: + - /bin/sh + - -c + - "ps aux | grep -v grep | grep python" + initialDelaySeconds: 30 + periodSeconds: 10