Compare commits

..

6 Commits

Author SHA1 Message Date
d3c1ec404a seems to work, client_id with uuid
All checks were successful
ci/woodpecker/tag/build/5 Pipeline was successful
ci/woodpecker/tag/build/6 Pipeline was successful
ci/woodpecker/tag/namespace Pipeline was successful
ci/woodpecker/tag/build/4 Pipeline was successful
ci/woodpecker/tag/build/1 Pipeline was successful
ci/woodpecker/tag/config Pipeline was successful
ci/woodpecker/tag/build/3 Pipeline was successful
ci/woodpecker/tag/build/7 Pipeline was successful
ci/woodpecker/tag/build/2 Pipeline was successful
ci/woodpecker/tag/deploy/3 Pipeline was successful
ci/woodpecker/tag/deploy/1 Pipeline was successful
ci/woodpecker/tag/deploy/4 Pipeline was successful
ci/woodpecker/tag/deploy/5 Pipeline was successful
ci/woodpecker/tag/deploy/2 Pipeline was successful
ci/woodpecker/tag/deploy/6 Pipeline was successful
ci/woodpecker/tag/ingress Pipeline was successful
2025-12-08 15:42:53 +01:00
9ba478c34d seems to work
All checks were successful
ci/woodpecker/tag/build/5 Pipeline was successful
ci/woodpecker/tag/build/6 Pipeline was successful
ci/woodpecker/tag/namespace Pipeline was successful
ci/woodpecker/tag/build/1 Pipeline was successful
ci/woodpecker/tag/build/4 Pipeline was successful
ci/woodpecker/tag/config Pipeline was successful
ci/woodpecker/tag/build/7 Pipeline was successful
ci/woodpecker/tag/build/2 Pipeline was successful
ci/woodpecker/tag/build/3 Pipeline was successful
ci/woodpecker/tag/deploy/2 Pipeline was successful
ci/woodpecker/tag/deploy/3 Pipeline was successful
ci/woodpecker/tag/deploy/1 Pipeline was successful
ci/woodpecker/tag/deploy/4 Pipeline was successful
ci/woodpecker/tag/deploy/5 Pipeline was successful
ci/woodpecker/tag/deploy/6 Pipeline was successful
ci/woodpecker/tag/ingress Pipeline was successful
2025-12-08 15:37:03 +01:00
15e132b187 messages fix 3 2025-12-08 14:27:50 +01:00
f40887ec37 messages fix 2 2025-12-08 14:27:25 +01:00
507f6f3854 messages fix 2025-12-08 14:25:31 +01:00
f163bb09bf initial 2025-12-08 13:56:48 +01:00
8 changed files with 370 additions and 0 deletions

View File

@@ -11,6 +11,7 @@ matrix:
- abstraction
- rules
- static
- pulsegen
- homekit
steps:

View File

@@ -16,6 +16,7 @@ matrix:
- abstraction
- rules
- static
- pulsegen
steps:
deploy-${APP}:

35
apps/pulsegen/Dockerfile Normal file
View File

@@ -0,0 +1,35 @@
# Pulsegen Dockerfile
# MQTT Pulse Generator Worker
FROM python:3.14-alpine
# Prevent Python from writing .pyc files and enable unbuffered output
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
MQTT_BROKER=172.16.2.16 \
MQTT_PORT=1883
# Create non-root user
RUN addgroup -g 10001 -S app && \
adduser -u 10001 -S app -G app
# Set working directory
WORKDIR /app
# Install Python dependencies
COPY apps/pulsegen/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY apps/__init__.py /app/apps/__init__.py
COPY apps/pulsegen/ /app/apps/pulsegen/
# Change ownership to app user
RUN chown -R app:app /app
# Switch to non-root user
USER app
# Run application
CMD ["python", "-m", "apps.pulsegen.main"]

53
apps/pulsegen/README.md Normal file
View File

@@ -0,0 +1,53 @@
# Pulsegen
MQTT-basierte Pulse-Generator Applikation für Home Automation.
## Funktionen
- MQTT-Kommunikation über `aiomqtt`
- Automatische Reconnect-Logik
- Graceful shutdown (SIGTERM/SIGINT)
- JSON message parsing
- Konfigurierbar über Umgebungsvariablen
## Umgebungsvariablen
- `MQTT_BROKER`: MQTT Broker Hostname (default: `localhost`)
- `MQTT_PORT`: MQTT Broker Port (default: `1883`)
## Entwicklung
Lokal starten:
```bash
cd apps/pulsegen
python -m venv venv
source venv/bin/activate # oder venv\Scripts\activate auf Windows
pip install -r requirements.txt
python main.py
```
## Docker
Build:
```bash
docker build -f apps/pulsegen/Dockerfile -t pulsegen .
```
Run:
```bash
docker run -e MQTT_BROKER=172.16.2.16 -e MQTT_PORT=1883 pulsegen
```
## MQTT Topics
### Subscribed
- `pulsegen/command/#` - Kommandos für pulsegen
- `home/+/+/state` - Device state updates
### Published
- `pulsegen/status` - Status-Updates der Applikation

View File

@@ -0,0 +1 @@
"""Pulsegen - MQTT pulse generator application."""

227
apps/pulsegen/main.py Normal file
View File

@@ -0,0 +1,227 @@
"""Pulsegen - MQTT pulse generator application."""
import asyncio
import json
import logging
import os
import signal
import uuid
from typing import Any
from aiomqtt import Client, Message
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
COIL_STATUS_PREFIX = "dt1/di"
COIL_STATUS_TOPIC = f"{COIL_STATUS_PREFIX}/+"
PULSEGEN_COMMAND_PREFIX = "pulsegen/command"
PULSEGEN_COMMAND_TOPIC = f"{PULSEGEN_COMMAND_PREFIX}/+/+"
COIL_COMMAND_PREFIX = "dt1/coil"
PULSEGEN_STATUS_PREFIX = "pulsegen/status"
COIL_STATUS_CACHE: dict[int, bool] = {}
def get_mqtt_settings() -> tuple[str, int]:
"""Get MQTT broker settings from environment variables.
Returns:
tuple: (broker_host, broker_port)
"""
broker = os.getenv("MQTT_BROKER", "localhost")
port = int(os.getenv("MQTT_PORT", "1883"))
logger.info(f"MQTT settings: broker={broker}, port={port}")
return broker, port
async def handle_message(message: Message, client: Client) -> None:
"""Handle incoming MQTT message.
Args:
message: MQTT message object
client: MQTT client instance
"""
try:
payload = message.payload.decode()
logger.info(f"Received message on {message.topic}: {payload}")
try:
topic = str(message.topic)
match topic.split("/"):
case [prefix, di, coil_id] if f"{prefix}/{di}" == COIL_STATUS_PREFIX:
try:
coil_num = int(coil_id)
except ValueError:
logger.debug(f"Invalid coil id in topic: {topic}")
return
state = payload.lower() in ("1", "true", "on")
COIL_STATUS_CACHE[coil_num] = state
logger.info(f"Updated coil {coil_num} status to {state}")
logger.info(f"Publishing pulsegen status for coil {coil_num}: {state}")
await client.publish(
topic=f"{PULSEGEN_STATUS_PREFIX}/{coil_num}",
payload="on" if state else "off",
qos=1,
retain=True,
)
case [prefix, command, coil_in_id, coil_out_id] if f"{prefix}/{command}" == PULSEGEN_COMMAND_PREFIX:
try:
coil_in_id = int(coil_in_id)
coil_out_id = int(coil_out_id)
except ValueError:
logger.debug(f"Invalid coil id in topic: {topic}")
return
try:
coil_state = COIL_STATUS_CACHE[coil_in_id]
except KeyError:
logger.debug(f"Coil {coil_in_id} status unknown, cannot process command")
return
cmd = payload.lower() in ("1", "true", "on")
if cmd == coil_state:
logger.info(f"Coil {coil_in_id} already in desired state {cmd}, ignoring command")
return
logger.info(f"Received pulsegen command on {topic}: {coil_in_id=}, {coil_out_id=}, {cmd=}")
coil_cmd_topic = f"{COIL_COMMAND_PREFIX}/{coil_out_id}"
logger.info(f"Sending raising edge command: topic={coil_cmd_topic}")
await client.publish(
topic=coil_cmd_topic,
payload="1",
qos=1,
retain=False,
)
await asyncio.sleep(0.2)
logger.info(f"Sending falling edge command: topic={coil_cmd_topic}")
await client.publish(
topic=coil_cmd_topic,
payload="0",
qos=1,
retain=False,
)
case _:
logger.debug(f"Ignoring message on unrelated topic: {topic}")
except Exception as e:
logger.debug(f"Exception when handling payload: {e}")
except Exception as e:
logger.error(f"Error handling message: {e}", exc_info=True)
async def publish_example(client: Client) -> None:
"""Example function to publish MQTT messages.
Args:
client: MQTT client instance
"""
topic = "pulsegen/status"
payload = {
"status": "running",
"timestamp": asyncio.get_event_loop().time()
}
await client.publish(
topic=topic,
payload=json.dumps(payload),
qos=1
)
logger.info(f"Published to {topic}: {payload}")
async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
"""Main MQTT worker loop.
Connects to MQTT broker, subscribes to topics, and processes messages.
Args:
shutdown_event: Event to signal shutdown
"""
broker, port = get_mqtt_settings()
reconnect_interval = 5 # seconds
while not shutdown_event.is_set():
try:
logger.info(f"Connecting to MQTT broker {broker}:{port}...")
async with Client(
hostname=broker,
port=port,
identifier=f"pulsegen-{uuid.uuid4()}",
) as client:
logger.info("Connected to MQTT broker")
# Subscribe to topics
for topic in [PULSEGEN_COMMAND_TOPIC, COIL_STATUS_TOPIC]:
await client.subscribe(topic)
logger.info(f"Subscribed to {topic}")
# Publish startup message
await publish_example(client)
# Message loop
async for message in client.messages:
if shutdown_event.is_set():
break
try:
await handle_message(message, client)
except Exception as e:
logger.error(f"Error in message handler: {e}", exc_info=True)
except asyncio.CancelledError:
logger.info("MQTT worker cancelled")
break
except Exception as e:
logger.error(f"MQTT error: {e}", exc_info=True)
if not shutdown_event.is_set():
logger.info(f"Reconnecting in {reconnect_interval} seconds...")
await asyncio.sleep(reconnect_interval)
async def main() -> None:
"""Main application entry point."""
logger.info("Starting pulsegen application...")
# Shutdown event for graceful shutdown
shutdown_event = asyncio.Event()
# Setup signal handlers
def signal_handler(sig: int) -> None:
logger.info(f"Received signal {sig}, initiating shutdown...")
shutdown_event.set()
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: signal_handler(s))
# Start MQTT worker
worker_task = asyncio.create_task(mqtt_worker(shutdown_event))
# Wait for shutdown signal
await shutdown_event.wait()
# Wait for worker to finish
await worker_task
logger.info("Pulsegen application stopped")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1 @@
aiomqtt==2.3.0

View File

@@ -0,0 +1,51 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: pulsegen
namespace: homea2
labels:
app: pulsegen
component: home-automation
spec:
replicas: 1
selector:
matchLabels:
app: pulsegen
template:
metadata:
annotations:
reloader.stakater.com/auto: "true"
configmap.reloader.stakater.com/reload: "home-automation-environment"
labels:
app: pulsegen
component: home-automation
spec:
containers:
- name: pulsegen
image: %IMAGE%
env:
- name: MQTT_BROKER
valueFrom:
configMapKeyRef:
name: home-automation-environment
key: SHARED_MQTT_BROKER
- name: MQTT_PORT
valueFrom:
configMapKeyRef:
name: home-automation-environment
key: SHARED_MQTT_PORT
resources:
limits:
cpu: 1000m
memory: 1Gi
requests:
cpu: 200m
memory: 256Mi
livenessProbe:
exec:
command:
- /bin/sh
- -c
- "ps aux | grep -v grep | grep python"
initialDelaySeconds: 30
periodSeconds: 10