Files
home-automation/apps/pulsegen/main.py
2025-12-08 14:27:25 +01:00

159 lines
4.5 KiB
Python

"""Pulsegen - MQTT pulse generator application."""
import asyncio
import json
import logging
import os
import signal
from typing import Any
from aiomqtt import Client, Message
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def get_mqtt_settings() -> tuple[str, int]:
"""Get MQTT broker settings from environment variables.
Returns:
tuple: (broker_host, broker_port)
"""
broker = os.getenv("MQTT_BROKER", "localhost")
port = int(os.getenv("MQTT_PORT", "1883"))
logger.info(f"MQTT settings: broker={broker}, port={port}")
return broker, port
async def handle_message(message: Message) -> None:
"""Handle incoming MQTT message.
Args:
message: MQTT message object
"""
try:
payload = message.payload.decode()
logger.info(f"Received message on {message.topic}: {payload}")
# Parse JSON payload if possible
try:
data = json.loads(payload)
logger.debug(f"Parsed JSON: {data}")
# TODO: Process message based on topic and data
except json.JSONDecodeError:
logger.debug(f"Non-JSON payload: {payload}")
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
async def publish_example(client: Client) -> None:
"""Example function to publish MQTT messages.
Args:
client: MQTT client instance
"""
topic = "pulsegen/status"
payload = {
"status": "running",
"timestamp": asyncio.get_event_loop().time()
}
await client.publish(
topic=topic,
payload=json.dumps(payload),
qos=1
)
logger.info(f"Published to {topic}: {payload}")
async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
"""Main MQTT worker loop.
Connects to MQTT broker, subscribes to topics, and processes messages.
Args:
shutdown_event: Event to signal shutdown
"""
broker, port = get_mqtt_settings()
# Topics to subscribe to
subscribe_topics = [
"pulsegen/command/+/+",
"dt1/di/+
]
reconnect_interval = 5 # seconds
while not shutdown_event.is_set():
try:
logger.info(f"Connecting to MQTT broker {broker}:{port}...")
async with Client(
hostname=broker,
port=port,
identifier="pulsegen"
) as client:
logger.info("Connected to MQTT broker")
# Subscribe to topics
for topic in subscribe_topics:
await client.subscribe(topic)
logger.info(f"Subscribed to {topic}")
# Publish startup message
await publish_example(client)
# Message loop
async for message in client.messages:
if shutdown_event.is_set():
break
try:
await handle_message(message)
except Exception as e:
logger.error(f"Error in message handler: {e}", exc_info=True)
except asyncio.CancelledError:
logger.info("MQTT worker cancelled")
break
except Exception as e:
logger.error(f"MQTT error: {e}", exc_info=True)
if not shutdown_event.is_set():
logger.info(f"Reconnecting in {reconnect_interval} seconds...")
await asyncio.sleep(reconnect_interval)
async def main() -> None:
"""Main application entry point."""
logger.info("Starting pulsegen application...")
# Shutdown event for graceful shutdown
shutdown_event = asyncio.Event()
# Setup signal handlers
def signal_handler(sig: int) -> None:
logger.info(f"Received signal {sig}, initiating shutdown...")
shutdown_event.set()
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: signal_handler(s))
# Start MQTT worker
worker_task = asyncio.create_task(mqtt_worker(shutdown_event))
# Wait for shutdown signal
await shutdown_event.wait()
# Wait for worker to finish
await worker_task
logger.info("Pulsegen application stopped")
if __name__ == "__main__":
asyncio.run(main())