All checks were successful
ci/woodpecker/tag/build/5 Pipeline was successful
ci/woodpecker/tag/build/6 Pipeline was successful
ci/woodpecker/tag/build/1 Pipeline was successful
ci/woodpecker/tag/build/4 Pipeline was successful
ci/woodpecker/tag/namespace Pipeline was successful
ci/woodpecker/tag/config Pipeline was successful
ci/woodpecker/tag/build/7 Pipeline was successful
ci/woodpecker/tag/build/3 Pipeline was successful
ci/woodpecker/tag/build/2 Pipeline was successful
ci/woodpecker/tag/deploy/1 Pipeline was successful
ci/woodpecker/tag/deploy/4 Pipeline was successful
ci/woodpecker/tag/deploy/5 Pipeline was successful
ci/woodpecker/tag/deploy/3 Pipeline was successful
ci/woodpecker/tag/deploy/2 Pipeline was successful
ci/woodpecker/tag/deploy/6 Pipeline was successful
ci/woodpecker/tag/ingress Pipeline was successful
242 lines
8.0 KiB
Python
242 lines
8.0 KiB
Python
"""Pulsegen - MQTT pulse generator application."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from aiomqtt import Client, Message
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
COIL_STATUS_PREFIX = "dt1/di"
|
|
COIL_STATUS_TOPIC = f"{COIL_STATUS_PREFIX}/+"
|
|
PULSEGEN_COMMAND_PREFIX = "pulsegen/command"
|
|
PULSEGEN_COMMAND_TOPIC = f"{PULSEGEN_COMMAND_PREFIX}/+/+"
|
|
COIL_COMMAND_PREFIX = "dt1/coil"
|
|
PULSEGEN_STATUS_PREFIX = "pulsegen/status"
|
|
|
|
COIL_STATUS_CACHE: dict[int, bool] = {}
|
|
|
|
def get_mqtt_settings() -> tuple[str, int]:
|
|
"""Get MQTT broker settings from environment variables.
|
|
|
|
Returns:
|
|
tuple: (broker_host, broker_port)
|
|
"""
|
|
broker = os.getenv("MQTT_BROKER", "localhost")
|
|
port = int(os.getenv("MQTT_PORT", "1883"))
|
|
logger.info(f"MQTT settings: broker={broker}, port={port}")
|
|
return broker, port
|
|
|
|
|
|
async def handle_message(message: Message, client: Client) -> None:
|
|
"""Handle incoming MQTT message.
|
|
|
|
Args:
|
|
message: MQTT message object
|
|
client: MQTT client instance
|
|
"""
|
|
try:
|
|
payload = message.payload.decode()
|
|
logger.info(f"Received message on {message.topic}: {payload}")
|
|
|
|
try:
|
|
topic = str(message.topic)
|
|
|
|
match topic.split("/"):
|
|
case [prefix, di, coil_id] if f"{prefix}/{di}" == COIL_STATUS_PREFIX:
|
|
try:
|
|
coil_num = int(coil_id)
|
|
except ValueError:
|
|
logger.debug(f"Invalid coil id in topic: {topic}")
|
|
return
|
|
|
|
state = payload.lower() in ("1", "true", "on")
|
|
COIL_STATUS_CACHE[coil_num] = state
|
|
logger.info(f"Updated coil {coil_num} status to {state}")
|
|
|
|
logger.info(f"Publishing pulsegen status for coil {coil_num}: {state}")
|
|
await client.publish(
|
|
topic=f"{PULSEGEN_STATUS_PREFIX}/{coil_num}",
|
|
payload="on" if state else "off",
|
|
qos=1,
|
|
retain=True,
|
|
)
|
|
|
|
case [prefix, command, coil_in_id, coil_out_id] if f"{prefix}/{command}" == PULSEGEN_COMMAND_PREFIX:
|
|
try:
|
|
coil_in_id = int(coil_in_id)
|
|
coil_out_id = int(coil_out_id)
|
|
except ValueError:
|
|
logger.debug(f"Invalid coil id in topic: {topic}")
|
|
return
|
|
|
|
try:
|
|
coil_state = COIL_STATUS_CACHE[coil_in_id]
|
|
except KeyError:
|
|
logger.debug(f"Coil {coil_in_id} status unknown, cannot process command")
|
|
return
|
|
|
|
cmd = payload.lower() in ("1", "true", "on")
|
|
|
|
if cmd == coil_state:
|
|
logger.info(f"Coil {coil_in_id} already in desired state {cmd}, ignoring command")
|
|
return
|
|
|
|
logger.info(f"Received pulsegen command on {topic}: {coil_in_id=}, {coil_out_id=}, {cmd=}")
|
|
|
|
|
|
coil_cmd_topic = f"{COIL_COMMAND_PREFIX}/{coil_out_id}"
|
|
|
|
logger.info(f"Sending raising edge command: topic={coil_cmd_topic}")
|
|
await client.publish(
|
|
topic=coil_cmd_topic,
|
|
payload="1",
|
|
qos=1,
|
|
retain=False,
|
|
)
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
logger.info(f"Sending falling edge command: topic={coil_cmd_topic}")
|
|
await client.publish(
|
|
topic=coil_cmd_topic,
|
|
payload="0",
|
|
qos=1,
|
|
retain=False,
|
|
)
|
|
|
|
case _:
|
|
logger.debug(f"Ignoring message on unrelated topic: {topic}")
|
|
except Exception as e:
|
|
logger.debug(f"Exception when handling payload: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling message: {e}", exc_info=True)
|
|
|
|
|
|
async def publish_example(client: Client) -> None:
|
|
"""Example function to publish MQTT messages.
|
|
|
|
Args:
|
|
client: MQTT client instance
|
|
"""
|
|
topic = "pulsegen/status"
|
|
payload = {
|
|
"status": "running",
|
|
"timestamp": asyncio.get_event_loop().time()
|
|
}
|
|
|
|
await client.publish(
|
|
topic=topic,
|
|
payload=json.dumps(payload),
|
|
qos=1
|
|
)
|
|
logger.info(f"Published to {topic}: {payload}")
|
|
|
|
|
|
async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
|
|
"""Main MQTT worker loop.
|
|
|
|
Connects to MQTT broker, subscribes to topics, and processes messages.
|
|
|
|
Args:
|
|
shutdown_event: Event to signal shutdown
|
|
"""
|
|
broker, port = get_mqtt_settings()
|
|
|
|
|
|
reconnect_interval = 5 # seconds
|
|
|
|
while not shutdown_event.is_set():
|
|
try:
|
|
logger.info(f"Connecting to MQTT broker {broker}:{port}...")
|
|
|
|
async with Client(
|
|
hostname=broker,
|
|
port=port,
|
|
identifier=f"pulsegen-{uuid.uuid4()}",
|
|
) as client:
|
|
logger.info("Connected to MQTT broker")
|
|
|
|
# Subscribe to topics
|
|
for topic in [PULSEGEN_COMMAND_TOPIC, COIL_STATUS_TOPIC]:
|
|
await client.subscribe(topic)
|
|
logger.info(f"Subscribed to {topic}")
|
|
|
|
# Publish startup message
|
|
await publish_example(client)
|
|
|
|
# Message loop with timeout to allow shutdown check
|
|
async for message in client.messages:
|
|
if shutdown_event.is_set():
|
|
logger.info("Shutdown event detected, breaking message loop")
|
|
break
|
|
try:
|
|
await handle_message(message, client)
|
|
except Exception as e:
|
|
logger.error(f"Error in message handler: {e}", exc_info=True)
|
|
|
|
# If we exit the loop due to shutdown, break the reconnect loop too
|
|
if shutdown_event.is_set():
|
|
break
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("MQTT worker cancelled")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"MQTT error: {e}", exc_info=True)
|
|
if not shutdown_event.is_set():
|
|
logger.info(f"Reconnecting in {reconnect_interval} seconds...")
|
|
await asyncio.sleep(reconnect_interval)
|
|
|
|
|
|
async def main() -> None:
|
|
"""Main application entry point."""
|
|
logger.info("Starting pulsegen application...")
|
|
|
|
# Shutdown event for graceful shutdown
|
|
shutdown_event = asyncio.Event()
|
|
|
|
# Setup signal handlers
|
|
def signal_handler(sig: int) -> None:
|
|
logger.info(f"Received signal {sig}, initiating shutdown...")
|
|
shutdown_event.set()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, lambda s=sig: signal_handler(s))
|
|
|
|
# Start MQTT worker
|
|
worker_task = asyncio.create_task(mqtt_worker(shutdown_event))
|
|
|
|
# Wait for shutdown signal
|
|
await shutdown_event.wait()
|
|
|
|
# Give worker a moment to finish gracefully
|
|
logger.info("Waiting for MQTT worker to finish...")
|
|
try:
|
|
await asyncio.wait_for(worker_task, timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("MQTT worker did not finish in time, cancelling...")
|
|
worker_task.cancel()
|
|
try:
|
|
await worker_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
logger.info("Pulsegen application stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|