initial
This commit is contained in:
35
apps/pulsegen/Dockerfile
Normal file
35
apps/pulsegen/Dockerfile
Normal file
@@ -0,0 +1,35 @@
|
||||
# Pulsegen Dockerfile
|
||||
# MQTT Pulse Generator Worker
|
||||
|
||||
FROM python:3.14-alpine
|
||||
|
||||
# Prevent Python from writing .pyc files and enable unbuffered output
|
||||
ENV PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
MQTT_BROKER=172.16.2.16 \
|
||||
MQTT_PORT=1883
|
||||
|
||||
|
||||
# Create non-root user
|
||||
RUN addgroup -g 10001 -S app && \
|
||||
adduser -u 10001 -S app -G app
|
||||
|
||||
# Set working directory
|
||||
WORKDIR /app
|
||||
|
||||
# Install Python dependencies
|
||||
COPY apps/pulsegen/requirements.txt /app/requirements.txt
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy application code
|
||||
COPY apps/__init__.py /app/apps/__init__.py
|
||||
COPY apps/pulsegen/ /app/apps/pulsegen/
|
||||
|
||||
# Change ownership to app user
|
||||
RUN chown -R app:app /app
|
||||
|
||||
# Switch to non-root user
|
||||
USER app
|
||||
|
||||
# Run application
|
||||
CMD ["python", "-m", "apps.pulsegen.main"]
|
||||
53
apps/pulsegen/README.md
Normal file
53
apps/pulsegen/README.md
Normal file
@@ -0,0 +1,53 @@
|
||||
# Pulsegen
|
||||
|
||||
MQTT-basierte Pulse-Generator Applikation für Home Automation.
|
||||
|
||||
## Funktionen
|
||||
|
||||
- MQTT-Kommunikation über `aiomqtt`
|
||||
- Automatische Reconnect-Logik
|
||||
- Graceful shutdown (SIGTERM/SIGINT)
|
||||
- JSON message parsing
|
||||
- Konfigurierbar über Umgebungsvariablen
|
||||
|
||||
## Umgebungsvariablen
|
||||
|
||||
- `MQTT_BROKER`: MQTT Broker Hostname (default: `localhost`)
|
||||
- `MQTT_PORT`: MQTT Broker Port (default: `1883`)
|
||||
|
||||
## Entwicklung
|
||||
|
||||
Lokal starten:
|
||||
|
||||
```bash
|
||||
cd apps/pulsegen
|
||||
python -m venv venv
|
||||
source venv/bin/activate # oder venv\Scripts\activate auf Windows
|
||||
pip install -r requirements.txt
|
||||
python main.py
|
||||
```
|
||||
|
||||
## Docker
|
||||
|
||||
Build:
|
||||
|
||||
```bash
|
||||
docker build -f apps/pulsegen/Dockerfile -t pulsegen .
|
||||
```
|
||||
|
||||
Run:
|
||||
|
||||
```bash
|
||||
docker run -e MQTT_BROKER=172.16.2.16 -e MQTT_PORT=1883 pulsegen
|
||||
```
|
||||
|
||||
## MQTT Topics
|
||||
|
||||
### Subscribed
|
||||
|
||||
- `pulsegen/command/#` - Kommandos für pulsegen
|
||||
- `home/+/+/state` - Device state updates
|
||||
|
||||
### Published
|
||||
|
||||
- `pulsegen/status` - Status-Updates der Applikation
|
||||
1
apps/pulsegen/__init__.py
Normal file
1
apps/pulsegen/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Pulsegen - MQTT pulse generator application."""
|
||||
156
apps/pulsegen/main.py
Normal file
156
apps/pulsegen/main.py
Normal file
@@ -0,0 +1,156 @@
|
||||
"""Pulsegen - MQTT pulse generator application."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
from typing import Any
|
||||
|
||||
from aiomqtt import Client, Message
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_mqtt_settings() -> tuple[str, int]:
|
||||
"""Get MQTT broker settings from environment variables.
|
||||
|
||||
Returns:
|
||||
tuple: (broker_host, broker_port)
|
||||
"""
|
||||
broker = os.getenv("MQTT_BROKER", "localhost")
|
||||
port = int(os.getenv("MQTT_PORT", "1883"))
|
||||
logger.info(f"MQTT settings: broker={broker}, port={port}")
|
||||
return broker, port
|
||||
|
||||
|
||||
async def handle_message(message: Message) -> None:
|
||||
"""Handle incoming MQTT message.
|
||||
|
||||
Args:
|
||||
message: MQTT message object
|
||||
"""
|
||||
try:
|
||||
payload = message.payload.decode()
|
||||
logger.info(f"Received message on {message.topic}: {payload}")
|
||||
|
||||
# Parse JSON payload if possible
|
||||
try:
|
||||
data = json.loads(payload)
|
||||
logger.debug(f"Parsed JSON: {data}")
|
||||
# TODO: Process message based on topic and data
|
||||
except json.JSONDecodeError:
|
||||
logger.debug(f"Non-JSON payload: {payload}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling message: {e}", exc_info=True)
|
||||
|
||||
|
||||
async def publish_example(client: Client) -> None:
|
||||
"""Example function to publish MQTT messages.
|
||||
|
||||
Args:
|
||||
client: MQTT client instance
|
||||
"""
|
||||
topic = "pulsegen/status"
|
||||
payload = {
|
||||
"status": "running",
|
||||
"timestamp": asyncio.get_event_loop().time()
|
||||
}
|
||||
|
||||
await client.publish(
|
||||
topic=topic,
|
||||
payload=json.dumps(payload),
|
||||
qos=1
|
||||
)
|
||||
logger.info(f"Published to {topic}: {payload}")
|
||||
|
||||
|
||||
async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
|
||||
"""Main MQTT worker loop.
|
||||
|
||||
Connects to MQTT broker, subscribes to topics, and processes messages.
|
||||
|
||||
Args:
|
||||
shutdown_event: Event to signal shutdown
|
||||
"""
|
||||
broker, port = get_mqtt_settings()
|
||||
|
||||
# Topics to subscribe to
|
||||
subscribe_topics = [
|
||||
"pulsegen/command/#",
|
||||
"home/+/+/state",
|
||||
]
|
||||
|
||||
reconnect_interval = 5 # seconds
|
||||
|
||||
while not shutdown_event.is_set():
|
||||
try:
|
||||
logger.info(f"Connecting to MQTT broker {broker}:{port}...")
|
||||
|
||||
async with Client(
|
||||
hostname=broker,
|
||||
port=port,
|
||||
identifier="pulsegen"
|
||||
) as client:
|
||||
logger.info("Connected to MQTT broker")
|
||||
|
||||
# Subscribe to topics
|
||||
for topic in subscribe_topics:
|
||||
await client.subscribe(topic)
|
||||
logger.info(f"Subscribed to {topic}")
|
||||
|
||||
# Publish startup message
|
||||
await publish_example(client)
|
||||
|
||||
# Message loop
|
||||
async with client.messages() as messages:
|
||||
async for message in messages:
|
||||
if shutdown_event.is_set():
|
||||
break
|
||||
await handle_message(message)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("MQTT worker cancelled")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"MQTT error: {e}", exc_info=True)
|
||||
if not shutdown_event.is_set():
|
||||
logger.info(f"Reconnecting in {reconnect_interval} seconds...")
|
||||
await asyncio.sleep(reconnect_interval)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
"""Main application entry point."""
|
||||
logger.info("Starting pulsegen application...")
|
||||
|
||||
# Shutdown event for graceful shutdown
|
||||
shutdown_event = asyncio.Event()
|
||||
|
||||
# Setup signal handlers
|
||||
def signal_handler(sig: int) -> None:
|
||||
logger.info(f"Received signal {sig}, initiating shutdown...")
|
||||
shutdown_event.set()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||
loop.add_signal_handler(sig, lambda s=sig: signal_handler(s))
|
||||
|
||||
# Start MQTT worker
|
||||
worker_task = asyncio.create_task(mqtt_worker(shutdown_event))
|
||||
|
||||
# Wait for shutdown signal
|
||||
await shutdown_event.wait()
|
||||
|
||||
# Wait for worker to finish
|
||||
await worker_task
|
||||
|
||||
logger.info("Pulsegen application stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
1
apps/pulsegen/requirements.txt
Normal file
1
apps/pulsegen/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
aiomqtt==2.3.0
|
||||
Reference in New Issue
Block a user