diff --git a/apps/pulsegen/Dockerfile b/apps/pulsegen/Dockerfile new file mode 100644 index 0000000..33566e0 --- /dev/null +++ b/apps/pulsegen/Dockerfile @@ -0,0 +1,35 @@ +# Pulsegen Dockerfile +# MQTT Pulse Generator Worker + +FROM python:3.14-alpine + +# Prevent Python from writing .pyc files and enable unbuffered output +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + MQTT_BROKER=172.16.2.16 \ + MQTT_PORT=1883 + + +# Create non-root user +RUN addgroup -g 10001 -S app && \ + adduser -u 10001 -S app -G app + +# Set working directory +WORKDIR /app + +# Install Python dependencies +COPY apps/pulsegen/requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY apps/__init__.py /app/apps/__init__.py +COPY apps/pulsegen/ /app/apps/pulsegen/ + +# Change ownership to app user +RUN chown -R app:app /app + +# Switch to non-root user +USER app + +# Run application +CMD ["python", "-m", "apps.pulsegen.main"] diff --git a/apps/pulsegen/README.md b/apps/pulsegen/README.md new file mode 100644 index 0000000..c530217 --- /dev/null +++ b/apps/pulsegen/README.md @@ -0,0 +1,53 @@ +# Pulsegen + +MQTT-basierte Pulse-Generator Applikation für Home Automation. + +## Funktionen + +- MQTT-Kommunikation über `aiomqtt` +- Automatische Reconnect-Logik +- Graceful shutdown (SIGTERM/SIGINT) +- JSON message parsing +- Konfigurierbar über Umgebungsvariablen + +## Umgebungsvariablen + +- `MQTT_BROKER`: MQTT Broker Hostname (default: `localhost`) +- `MQTT_PORT`: MQTT Broker Port (default: `1883`) + +## Entwicklung + +Lokal starten: + +```bash +cd apps/pulsegen +python -m venv venv +source venv/bin/activate # oder venv\Scripts\activate auf Windows +pip install -r requirements.txt +python main.py +``` + +## Docker + +Build: + +```bash +docker build -f apps/pulsegen/Dockerfile -t pulsegen . +``` + +Run: + +```bash +docker run -e MQTT_BROKER=172.16.2.16 -e MQTT_PORT=1883 pulsegen +``` + +## MQTT Topics + +### Subscribed + +- `pulsegen/command/#` - Kommandos für pulsegen +- `home/+/+/state` - Device state updates + +### Published + +- `pulsegen/status` - Status-Updates der Applikation diff --git a/apps/pulsegen/__init__.py b/apps/pulsegen/__init__.py new file mode 100644 index 0000000..1213349 --- /dev/null +++ b/apps/pulsegen/__init__.py @@ -0,0 +1 @@ +"""Pulsegen - MQTT pulse generator application.""" diff --git a/apps/pulsegen/main.py b/apps/pulsegen/main.py new file mode 100644 index 0000000..d45dfc0 --- /dev/null +++ b/apps/pulsegen/main.py @@ -0,0 +1,156 @@ +"""Pulsegen - MQTT pulse generator application.""" + +import asyncio +import json +import logging +import os +import signal +from typing import Any + +from aiomqtt import Client, Message + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +def get_mqtt_settings() -> tuple[str, int]: + """Get MQTT broker settings from environment variables. + + Returns: + tuple: (broker_host, broker_port) + """ + broker = os.getenv("MQTT_BROKER", "localhost") + port = int(os.getenv("MQTT_PORT", "1883")) + logger.info(f"MQTT settings: broker={broker}, port={port}") + return broker, port + + +async def handle_message(message: Message) -> None: + """Handle incoming MQTT message. + + Args: + message: MQTT message object + """ + try: + payload = message.payload.decode() + logger.info(f"Received message on {message.topic}: {payload}") + + # Parse JSON payload if possible + try: + data = json.loads(payload) + logger.debug(f"Parsed JSON: {data}") + # TODO: Process message based on topic and data + except json.JSONDecodeError: + logger.debug(f"Non-JSON payload: {payload}") + + except Exception as e: + logger.error(f"Error handling message: {e}", exc_info=True) + + +async def publish_example(client: Client) -> None: + """Example function to publish MQTT messages. + + Args: + client: MQTT client instance + """ + topic = "pulsegen/status" + payload = { + "status": "running", + "timestamp": asyncio.get_event_loop().time() + } + + await client.publish( + topic=topic, + payload=json.dumps(payload), + qos=1 + ) + logger.info(f"Published to {topic}: {payload}") + + +async def mqtt_worker(shutdown_event: asyncio.Event) -> None: + """Main MQTT worker loop. + + Connects to MQTT broker, subscribes to topics, and processes messages. + + Args: + shutdown_event: Event to signal shutdown + """ + broker, port = get_mqtt_settings() + + # Topics to subscribe to + subscribe_topics = [ + "pulsegen/command/#", + "home/+/+/state", + ] + + reconnect_interval = 5 # seconds + + while not shutdown_event.is_set(): + try: + logger.info(f"Connecting to MQTT broker {broker}:{port}...") + + async with Client( + hostname=broker, + port=port, + identifier="pulsegen" + ) as client: + logger.info("Connected to MQTT broker") + + # Subscribe to topics + for topic in subscribe_topics: + await client.subscribe(topic) + logger.info(f"Subscribed to {topic}") + + # Publish startup message + await publish_example(client) + + # Message loop + async with client.messages() as messages: + async for message in messages: + if shutdown_event.is_set(): + break + await handle_message(message) + + except asyncio.CancelledError: + logger.info("MQTT worker cancelled") + break + except Exception as e: + logger.error(f"MQTT error: {e}", exc_info=True) + if not shutdown_event.is_set(): + logger.info(f"Reconnecting in {reconnect_interval} seconds...") + await asyncio.sleep(reconnect_interval) + + +async def main() -> None: + """Main application entry point.""" + logger.info("Starting pulsegen application...") + + # Shutdown event for graceful shutdown + shutdown_event = asyncio.Event() + + # Setup signal handlers + def signal_handler(sig: int) -> None: + logger.info(f"Received signal {sig}, initiating shutdown...") + shutdown_event.set() + + loop = asyncio.get_event_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, lambda s=sig: signal_handler(s)) + + # Start MQTT worker + worker_task = asyncio.create_task(mqtt_worker(shutdown_event)) + + # Wait for shutdown signal + await shutdown_event.wait() + + # Wait for worker to finish + await worker_task + + logger.info("Pulsegen application stopped") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/apps/pulsegen/requirements.txt b/apps/pulsegen/requirements.txt new file mode 100644 index 0000000..c93a3c9 --- /dev/null +++ b/apps/pulsegen/requirements.txt @@ -0,0 +1 @@ +aiomqtt==2.3.0