seems to work
All checks were successful
ci/woodpecker/tag/build/5 Pipeline was successful
ci/woodpecker/tag/build/6 Pipeline was successful
ci/woodpecker/tag/namespace Pipeline was successful
ci/woodpecker/tag/build/1 Pipeline was successful
ci/woodpecker/tag/build/4 Pipeline was successful
ci/woodpecker/tag/config Pipeline was successful
ci/woodpecker/tag/build/7 Pipeline was successful
ci/woodpecker/tag/build/2 Pipeline was successful
ci/woodpecker/tag/build/3 Pipeline was successful
ci/woodpecker/tag/deploy/2 Pipeline was successful
ci/woodpecker/tag/deploy/3 Pipeline was successful
ci/woodpecker/tag/deploy/1 Pipeline was successful
ci/woodpecker/tag/deploy/4 Pipeline was successful
ci/woodpecker/tag/deploy/5 Pipeline was successful
ci/woodpecker/tag/deploy/6 Pipeline was successful
ci/woodpecker/tag/ingress Pipeline was successful

This commit is contained in:
2025-12-08 15:37:03 +01:00
parent 15e132b187
commit 9ba478c34d
4 changed files with 136 additions and 15 deletions

View File

@@ -10,12 +10,21 @@ from typing import Any
from aiomqtt import Client, Message
logging.basicConfig(
level=logging.INFO,
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
COIL_STATUS_PREFIX = "dt1/di"
COIL_STATUS_TOPIC = f"{COIL_STATUS_PREFIX}/+"
PULSEGEN_COMMAND_PREFIX = "pulsegen/command"
PULSEGEN_COMMAND_TOPIC = f"{PULSEGEN_COMMAND_PREFIX}/+/+"
COIL_COMMAND_PREFIX = "dt1/coil"
PULSEGEN_STATUS_PREFIX = "pulsegen/status"
COIL_STATUS_CACHE: dict[int, bool] = {}
def get_mqtt_settings() -> tuple[str, int]:
"""Get MQTT broker settings from environment variables.
@@ -28,23 +37,87 @@ def get_mqtt_settings() -> tuple[str, int]:
return broker, port
async def handle_message(message: Message) -> None:
async def handle_message(message: Message, client: Client) -> None:
"""Handle incoming MQTT message.
Args:
message: MQTT message object
client: MQTT client instance
"""
try:
payload = message.payload.decode()
logger.info(f"Received message on {message.topic}: {payload}")
# Parse JSON payload if possible
try:
data = json.loads(payload)
logger.debug(f"Parsed JSON: {data}")
# TODO: Process message based on topic and data
except json.JSONDecodeError:
logger.debug(f"Non-JSON payload: {payload}")
topic = str(message.topic)
match topic.split("/"):
case [prefix, di, coil_id] if f"{prefix}/{di}" == COIL_STATUS_PREFIX:
try:
coil_num = int(coil_id)
except ValueError:
logger.debug(f"Invalid coil id in topic: {topic}")
return
state = payload.lower() in ("1", "true", "on")
COIL_STATUS_CACHE[coil_num] = state
logger.info(f"Updated coil {coil_num} status to {state}")
logger.info(f"Publishing pulsegen status for coil {coil_num}: {state}")
await client.publish(
topic=f"{PULSEGEN_STATUS_PREFIX}/{coil_num}",
payload="on" if state else "off",
qos=1,
retain=True,
)
case [prefix, command, coil_in_id, coil_out_id] if f"{prefix}/{command}" == PULSEGEN_COMMAND_PREFIX:
try:
coil_in_id = int(coil_in_id)
coil_out_id = int(coil_out_id)
except ValueError:
logger.debug(f"Invalid coil id in topic: {topic}")
return
try:
coil_state = COIL_STATUS_CACHE[coil_in_id]
except KeyError:
logger.debug(f"Coil {coil_in_id} status unknown, cannot process command")
return
cmd = payload.lower() in ("1", "true", "on")
if cmd == coil_state:
logger.info(f"Coil {coil_in_id} already in desired state {cmd}, ignoring command")
return
logger.info(f"Received pulsegen command on {topic}: {coil_in_id=}, {coil_out_id=}, {cmd=}")
coil_cmd_topic = f"{COIL_COMMAND_PREFIX}/{coil_out_id}"
logger.info(f"Sending raising edge command: topic={coil_cmd_topic}")
await client.publish(
topic=coil_cmd_topic,
payload="1",
qos=1,
retain=False,
)
await asyncio.sleep(0.2)
logger.info(f"Sending falling edge command: topic={coil_cmd_topic}")
await client.publish(
topic=coil_cmd_topic,
payload="0",
qos=1,
retain=False,
)
case _:
logger.debug(f"Ignoring message on unrelated topic: {topic}")
except Exception as e:
logger.debug(f"Exception when handling payload: {e}")
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
@@ -80,11 +153,6 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
"""
broker, port = get_mqtt_settings()
# Topics to subscribe to
subscribe_topics = [
"pulsegen/command/+/+",
"dt1/di/+"
]
reconnect_interval = 5 # seconds
@@ -100,7 +168,7 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
logger.info("Connected to MQTT broker")
# Subscribe to topics
for topic in subscribe_topics:
for topic in [PULSEGEN_COMMAND_TOPIC, COIL_STATUS_TOPIC]:
await client.subscribe(topic)
logger.info(f"Subscribed to {topic}")
@@ -112,7 +180,7 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
if shutdown_event.is_set():
break
try:
await handle_message(message)
await handle_message(message, client)
except Exception as e:
logger.error(f"Error in message handler: {e}", exc_info=True)