Compare commits
5 Commits
shellies_r
...
rules
| Author | SHA1 | Date | |
|---|---|---|---|
|
b6b441c0ca
|
|||
|
d3d96ed3e9
|
|||
|
2e2963488b
|
|||
|
7928bc596f
|
|||
|
3874eaed83
|
53
apps/rules/Dockerfile
Normal file
53
apps/rules/Dockerfile
Normal file
@@ -0,0 +1,53 @@
|
||||
# Rules Engine Dockerfile
|
||||
# Event-driven automation rules processor with MQTT and Redis
|
||||
|
||||
FROM python:3.14-alpine
|
||||
|
||||
# Prevent Python from writing .pyc files and enable unbuffered output
|
||||
ENV PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
RULES_CONFIG=config/rules.yaml \
|
||||
MQTT_BROKER=172.16.2.16 \
|
||||
MQTT_PORT=1883 \
|
||||
REDIS_HOST=localhost \
|
||||
REDIS_PORT=6379 \
|
||||
REDIS_DB=8 \
|
||||
LOG_LEVEL=INFO
|
||||
|
||||
# Create non-root user
|
||||
RUN addgroup -g 10001 -S app && \
|
||||
adduser -u 10001 -S app -G app
|
||||
|
||||
# Set working directory
|
||||
WORKDIR /app
|
||||
|
||||
# Install system dependencies
|
||||
RUN apk add --no-cache \
|
||||
gcc \
|
||||
musl-dev \
|
||||
linux-headers
|
||||
|
||||
# Install Python dependencies
|
||||
COPY apps/rules/requirements.txt /app/requirements.txt
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy application code
|
||||
COPY apps/__init__.py /app/apps/
|
||||
COPY apps/rules/ /app/apps/rules/
|
||||
COPY packages/ /app/packages/
|
||||
COPY config/ /app/config/
|
||||
|
||||
# Change ownership to non-root user
|
||||
RUN chown -R app:app /app
|
||||
|
||||
# Switch to non-root user
|
||||
USER app
|
||||
|
||||
# Expose no ports (MQTT/Redis client only)
|
||||
|
||||
# Health check (check if process is running)
|
||||
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
|
||||
CMD pgrep -f "apps.rules.main" || exit 1
|
||||
|
||||
# Run the rules engine
|
||||
CMD ["python", "-m", "apps.rules.main"]
|
||||
371
apps/rules/RULE_INTERFACE.md
Normal file
371
apps/rules/RULE_INTERFACE.md
Normal file
@@ -0,0 +1,371 @@
|
||||
# Rule Interface Documentation
|
||||
|
||||
## Overview
|
||||
|
||||
The rule interface provides a clean abstraction for implementing automation rules. Rules respond to device state changes and can publish commands, persist state, and log diagnostics.
|
||||
|
||||
## Core Components
|
||||
|
||||
### 1. RuleDescriptor
|
||||
|
||||
Configuration data for a rule instance (loaded from `rules.yaml`):
|
||||
|
||||
```python
|
||||
RuleDescriptor(
|
||||
id="window_setback_wohnzimmer", # Unique rule ID
|
||||
name="Fensterabsenkung Wohnzimmer", # Optional display name
|
||||
type="window_setback@1.0", # Rule type + version
|
||||
targets={ # Rule-specific targets
|
||||
"rooms": ["Wohnzimmer"],
|
||||
"contacts": ["kontakt_wohnzimmer_..."],
|
||||
"thermostats": ["thermostat_wohnzimmer"]
|
||||
},
|
||||
params={ # Rule-specific parameters
|
||||
"eco_target": 16.0,
|
||||
"open_min_secs": 20
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
### 2. RedisState
|
||||
|
||||
Async state persistence with automatic reconnection and retry logic:
|
||||
|
||||
```python
|
||||
# Initialize (done by rule engine)
|
||||
redis_state = RedisState("redis://172.23.1.116:6379/8")
|
||||
|
||||
# Simple key-value with TTL
|
||||
await ctx.redis.set("rules:my_rule:temp", "22.5", ttl_secs=3600)
|
||||
value = await ctx.redis.get("rules:my_rule:temp") # Returns "22.5" or None
|
||||
|
||||
# Hash storage (for multiple related values)
|
||||
await ctx.redis.hset("rules:my_rule:sensors", "bedroom", "open")
|
||||
await ctx.redis.hset("rules:my_rule:sensors", "kitchen", "closed")
|
||||
value = await ctx.redis.hget("rules:my_rule:sensors", "bedroom") # "open"
|
||||
|
||||
# TTL management
|
||||
await ctx.redis.expire("rules:my_rule:temp", 7200) # Extend to 2 hours
|
||||
|
||||
# JSON helpers (for complex data)
|
||||
import json
|
||||
data = {"temp": 22.5, "humidity": 45}
|
||||
await ctx.redis.set("rules:my_rule:data", ctx.redis._dumps(data))
|
||||
stored = await ctx.redis.get("rules:my_rule:data")
|
||||
parsed = ctx.redis._loads(stored) if stored else None
|
||||
```
|
||||
|
||||
**Key Conventions:**
|
||||
- Use prefix `rules:{rule_id}:` for all keys
|
||||
- Example: `rules:window_setback_wohnzimmer:thermo:device_123:previous`
|
||||
- TTL recommended for temporary state (previous temperatures, timers)
|
||||
|
||||
**Robustness Features:**
|
||||
- Automatic retry with exponential backoff (default: 3 retries)
|
||||
- Connection pooling (max 10 connections)
|
||||
- Automatic reconnection on Redis restart
|
||||
- Health checks every 30 seconds
|
||||
- All operations wait and retry, no exceptions on temporary outages
|
||||
|
||||
### 3. MQTTClient
|
||||
|
||||
Async MQTT client with event normalization and command publishing:
|
||||
|
||||
```python
|
||||
# Initialize (done by rule engine)
|
||||
mqtt_client = MQTTClient(
|
||||
broker="172.16.2.16",
|
||||
port=1883,
|
||||
client_id="rule_engine"
|
||||
)
|
||||
|
||||
# Subscribe and receive normalized events
|
||||
async for event in mqtt_client.connect():
|
||||
# Event structure:
|
||||
# {
|
||||
# "topic": "home/contact/sensor_1/state",
|
||||
# "type": "state",
|
||||
# "cap": "contact", # Capability (contact, thermostat, etc.)
|
||||
# "device_id": "sensor_1",
|
||||
# "payload": {"contact": "open"},
|
||||
# "ts": "2025-11-11T10:30:45.123456"
|
||||
# }
|
||||
|
||||
if event['cap'] == 'contact':
|
||||
handle_contact(event)
|
||||
elif event['cap'] == 'thermostat':
|
||||
handle_thermostat(event)
|
||||
|
||||
# Publish commands (within async context)
|
||||
await mqtt_client.publish_set_thermostat("thermostat_id", 22.5)
|
||||
```
|
||||
|
||||
**Subscriptions:**
|
||||
- `home/contact/+/state` - All contact sensor state changes
|
||||
- `home/thermostat/+/state` - All thermostat state changes
|
||||
|
||||
**Publishing:**
|
||||
- Topic: `home/thermostat/{device_id}/set`
|
||||
- Payload: `{"type":"thermostat","payload":{"target":22.5}}`
|
||||
- QoS: 1 (at least once delivery)
|
||||
|
||||
**Robustness:**
|
||||
- Automatic reconnection with exponential backoff
|
||||
- Connection logging (connect/disconnect events)
|
||||
- Clean session handling
|
||||
|
||||
### 4. MQTTPublisher (Legacy)
|
||||
|
||||
Simplified wrapper around MQTTClient for backward compatibility:
|
||||
|
||||
```python
|
||||
# Set thermostat temperature
|
||||
await ctx.mqtt.publish_set_thermostat("thermostat_wohnzimmer", 21.5)
|
||||
```
|
||||
|
||||
### 5. RuleContext
|
||||
|
||||
Runtime context provided to rules:
|
||||
|
||||
```python
|
||||
class RuleContext:
|
||||
logger # Logger instance
|
||||
mqtt # MQTTPublisher
|
||||
redis # RedisState
|
||||
now() -> datetime # Current timestamp
|
||||
```
|
||||
|
||||
### 5. Rule Abstract Base Class
|
||||
|
||||
All rules extend this:
|
||||
|
||||
```python
|
||||
class MyRule(Rule):
|
||||
async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
||||
# Event structure:
|
||||
# {
|
||||
# "topic": "home/contact/device_id/state",
|
||||
# "type": "state",
|
||||
# "cap": "contact",
|
||||
# "device_id": "kontakt_wohnzimmer",
|
||||
# "payload": {"contact": "open"},
|
||||
# "ts": "2025-11-11T10:30:45.123456"
|
||||
# }
|
||||
|
||||
device_id = evt['device_id']
|
||||
cap = evt['cap']
|
||||
|
||||
if cap == 'contact':
|
||||
contact_state = evt['payload'].get('contact')
|
||||
# ... implement logic
|
||||
```
|
||||
|
||||
## Implementing a New Rule
|
||||
|
||||
### Step 1: Create Rule Class
|
||||
|
||||
```python
|
||||
from packages.rule_interface import Rule, RuleDescriptor, RuleContext
|
||||
from typing import Any
|
||||
|
||||
class MyCustomRule(Rule):
|
||||
"""My custom automation rule."""
|
||||
|
||||
async def on_event(
|
||||
self,
|
||||
evt: dict[str, Any],
|
||||
desc: RuleDescriptor,
|
||||
ctx: RuleContext
|
||||
) -> None:
|
||||
"""Process device state changes."""
|
||||
|
||||
# 1. Extract event data
|
||||
device_id = evt['device_id']
|
||||
cap = evt['cap']
|
||||
payload = evt['payload']
|
||||
|
||||
# 2. Filter to relevant devices
|
||||
if device_id not in desc.targets.get('my_devices', []):
|
||||
return
|
||||
|
||||
# 3. Implement logic
|
||||
if cap == 'contact':
|
||||
if payload.get('contact') == 'open':
|
||||
# Do something
|
||||
await ctx.mqtt.publish_set_thermostat(
|
||||
'some_thermostat',
|
||||
desc.params.get('temp', 20.0)
|
||||
)
|
||||
|
||||
# 4. Persist state if needed
|
||||
state_key = f"rule:{desc.id}:device:{device_id}:state"
|
||||
await ctx.redis.set(state_key, payload.get('contact'))
|
||||
```
|
||||
|
||||
### Step 2: Register in RULE_IMPLEMENTATIONS
|
||||
|
||||
```python
|
||||
# In your rule module (e.g., my_custom_rule.py)
|
||||
RULE_IMPLEMENTATIONS = {
|
||||
'my_custom@1.0': MyCustomRule,
|
||||
}
|
||||
```
|
||||
|
||||
### Step 3: Configure in rules.yaml
|
||||
|
||||
```yaml
|
||||
rules:
|
||||
- id: my_custom_living_room
|
||||
name: My Custom Rule for Living Room
|
||||
type: my_custom@1.0
|
||||
targets:
|
||||
my_devices:
|
||||
- device_1
|
||||
- device_2
|
||||
params:
|
||||
temp: 22.0
|
||||
duration_secs: 300
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
### Idempotency
|
||||
|
||||
Rules MUST be idempotent - processing the same event multiple times should be safe:
|
||||
|
||||
```python
|
||||
# Good: Idempotent
|
||||
async def on_event(self, evt, desc, ctx):
|
||||
if evt['payload'].get('contact') == 'open':
|
||||
await ctx.mqtt.publish_set_thermostat('thermo', 16.0)
|
||||
|
||||
# Bad: Not idempotent (increments counter)
|
||||
async def on_event(self, evt, desc, ctx):
|
||||
counter = await ctx.redis.get('counter') or '0'
|
||||
await ctx.redis.set('counter', str(int(counter) + 1))
|
||||
```
|
||||
|
||||
### Error Handling
|
||||
|
||||
Handle errors gracefully - the engine will catch and log exceptions:
|
||||
|
||||
```python
|
||||
async def on_event(self, evt, desc, ctx):
|
||||
try:
|
||||
await ctx.mqtt.publish_set_thermostat('thermo', 16.0)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"Failed to set thermostat: {e}")
|
||||
# Don't raise - let event processing continue
|
||||
```
|
||||
|
||||
### State Keys
|
||||
|
||||
Use consistent naming for Redis keys:
|
||||
|
||||
```python
|
||||
# Pattern: rule:{rule_id}:{category}:{device_id}:{field}
|
||||
state_key = f"rule:{desc.id}:contact:{device_id}:state"
|
||||
ts_key = f"rule:{desc.id}:contact:{device_id}:ts"
|
||||
prev_key = f"rule:{desc.id}:thermo:{device_id}:previous"
|
||||
```
|
||||
|
||||
### Logging
|
||||
|
||||
Use appropriate log levels:
|
||||
|
||||
```python
|
||||
ctx.logger.debug("Detailed diagnostic info")
|
||||
ctx.logger.info("Normal operation milestones")
|
||||
ctx.logger.warning("Unexpected but handled situations")
|
||||
ctx.logger.error("Errors that prevent operation")
|
||||
```
|
||||
|
||||
## Event Structure Reference
|
||||
|
||||
### Contact Sensor Event
|
||||
|
||||
```python
|
||||
{
|
||||
"topic": "home/contact/kontakt_wohnzimmer/state",
|
||||
"type": "state",
|
||||
"cap": "contact",
|
||||
"device_id": "kontakt_wohnzimmer",
|
||||
"payload": {
|
||||
"contact": "open" # or "closed"
|
||||
},
|
||||
"ts": "2025-11-11T10:30:45.123456"
|
||||
}
|
||||
```
|
||||
|
||||
### Thermostat Event
|
||||
|
||||
```python
|
||||
{
|
||||
"topic": "home/thermostat/thermostat_wohnzimmer/state",
|
||||
"type": "state",
|
||||
"cap": "thermostat",
|
||||
"device_id": "thermostat_wohnzimmer",
|
||||
"payload": {
|
||||
"target": 21.0,
|
||||
"current": 20.5,
|
||||
"mode": "heat"
|
||||
},
|
||||
"ts": "2025-11-11T10:30:45.123456"
|
||||
}
|
||||
```
|
||||
|
||||
## Testing Rules
|
||||
|
||||
Rules can be tested independently of the engine:
|
||||
|
||||
```python
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
from packages.my_custom_rule import MyCustomRule
|
||||
from packages.rule_interface import RuleDescriptor, RuleContext
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_my_rule():
|
||||
# Setup
|
||||
rule = MyCustomRule()
|
||||
|
||||
desc = RuleDescriptor(
|
||||
id="test_rule",
|
||||
type="my_custom@1.0",
|
||||
targets={"my_devices": ["device_1"]},
|
||||
params={"temp": 22.0}
|
||||
)
|
||||
|
||||
# Mock context
|
||||
ctx = RuleContext(
|
||||
logger=MagicMock(),
|
||||
mqtt_publisher=AsyncMock(),
|
||||
redis_state=AsyncMock(),
|
||||
now_fn=lambda: datetime.now()
|
||||
)
|
||||
|
||||
# Test event
|
||||
evt = {
|
||||
"device_id": "device_1",
|
||||
"cap": "contact",
|
||||
"payload": {"contact": "open"},
|
||||
"ts": "2025-11-11T10:30:45.123456"
|
||||
}
|
||||
|
||||
# Execute
|
||||
await rule.on_event(evt, desc, ctx)
|
||||
|
||||
# Assert
|
||||
ctx.mqtt.publish_set_thermostat.assert_called_once_with('some_thermostat', 22.0)
|
||||
```
|
||||
|
||||
## Extension Points
|
||||
|
||||
The interface is designed to be extended without modifying the engine:
|
||||
|
||||
1. **New rule types**: Just implement `Rule` and register in `RULE_IMPLEMENTATIONS`
|
||||
2. **New MQTT commands**: Extend `MQTTPublisher` with new methods
|
||||
3. **New state backends**: Implement `RedisState` interface with different storage
|
||||
4. **Custom context**: Extend `RuleContext` with additional utilities
|
||||
|
||||
The engine only depends on the abstract interfaces, not specific implementations.
|
||||
15
apps/rules/impl/__init__.py
Normal file
15
apps/rules/impl/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""
|
||||
Rule Implementations Package
|
||||
|
||||
This package contains all rule implementation modules.
|
||||
|
||||
Naming Convention:
|
||||
- Module name: snake_case matching the rule type name
|
||||
Example: window_setback.py for type 'window_setback@1.0'
|
||||
|
||||
- Class name: PascalCase + 'Rule' suffix
|
||||
Example: WindowSetbackRule
|
||||
|
||||
The rule engine uses load_rule() from rule_interface to dynamically
|
||||
import modules from this package based on the 'type' field in rules.yaml.
|
||||
"""
|
||||
256
apps/rules/impl/window_setback.py
Normal file
256
apps/rules/impl/window_setback.py
Normal file
@@ -0,0 +1,256 @@
|
||||
"""
|
||||
Example Rule Implementation: Window Setback
|
||||
|
||||
Demonstrates how to implement a Rule using the rule_interface.
|
||||
This rule lowers thermostat temperature when a window is opened.
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
|
||||
from apps.rules.rule_interface import Rule, RuleDescriptor, RuleContext
|
||||
|
||||
|
||||
class WindowSetbackObjects(BaseModel):
|
||||
"""Object structure for window setback rule"""
|
||||
contacts: list[str] = Field(..., min_length=1, description="Contact sensors to monitor")
|
||||
thermostats: list[str] = Field(..., min_length=1, description="Thermostats to control")
|
||||
|
||||
|
||||
class WindowSetbackRule(Rule):
|
||||
"""
|
||||
Window setback automation rule.
|
||||
|
||||
When a window/door contact opens, set thermostats to eco temperature.
|
||||
When closed for a minimum duration, restore previous target temperature.
|
||||
|
||||
Configuration:
|
||||
objects:
|
||||
contacts: List of contact sensor device IDs to monitor (required, min 1)
|
||||
thermostats: List of thermostat device IDs to control (required, min 1)
|
||||
params:
|
||||
eco_target: Temperature to set when window opens (default: 16.0)
|
||||
open_min_secs: Minimum seconds window must be open before triggering (default: 20)
|
||||
close_min_secs: Minimum seconds window must be closed before restoring (default: 20)
|
||||
previous_target_ttl_secs: How long to remember previous temperature (default: 86400)
|
||||
|
||||
State storage (Redis keys):
|
||||
rule:{rule_id}:contact:{device_id}:state -> "open" | "closed"
|
||||
rule:{rule_id}:contact:{device_id}:ts -> ISO timestamp of last change
|
||||
rule:{rule_id}:thermo:{device_id}:current_target -> Current target temp (updated on every STATE)
|
||||
rule:{rule_id}:thermo:{device_id}:previous -> Previous target temp (saved on window open, deleted on restore)
|
||||
|
||||
Logic:
|
||||
1. Thermostat STATE events → update current_target in Redis
|
||||
2. Window opens → copy current_target to previous, then set to eco_target
|
||||
3. Window closes → restore from previous, then delete previous key
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._validated_objects: dict[str, WindowSetbackObjects] = {}
|
||||
|
||||
async def setup(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
||||
"""Validate objects structure during setup"""
|
||||
try:
|
||||
validated = WindowSetbackObjects(**desc.objects)
|
||||
self._validated_objects[desc.id] = validated
|
||||
ctx.logger.info(
|
||||
f"Rule {desc.id} validated: {len(validated.contacts)} contacts, "
|
||||
f"{len(validated.thermostats)} thermostats"
|
||||
)
|
||||
except ValidationError as e:
|
||||
raise ValueError(
|
||||
f"Invalid objects configuration for rule {desc.id}: {e}"
|
||||
) from e
|
||||
|
||||
def get_subscriptions(self, desc: RuleDescriptor) -> list[str]:
|
||||
"""
|
||||
Return MQTT topics to subscribe to.
|
||||
|
||||
Subscribe to:
|
||||
- Contact sensor state changes (to detect window open/close)
|
||||
- Thermostat state changes (to track current target temperature)
|
||||
"""
|
||||
topics = []
|
||||
|
||||
# Subscribe to contact sensors
|
||||
contacts = desc.objects.get('contacts', [])
|
||||
for contact_id in contacts:
|
||||
topics.append(f"home/contact/{contact_id}/state")
|
||||
|
||||
# Subscribe to thermostats to track their current target temperature
|
||||
thermostats = desc.objects.get('thermostats', [])
|
||||
for thermo_id in thermostats:
|
||||
topics.append(f"home/thermostat/{thermo_id}/state")
|
||||
|
||||
return topics
|
||||
|
||||
async def on_event(
|
||||
self,
|
||||
evt: dict[str, Any],
|
||||
desc: RuleDescriptor,
|
||||
ctx: RuleContext
|
||||
) -> None:
|
||||
"""
|
||||
Process contact sensor or thermostat state changes.
|
||||
|
||||
Logic:
|
||||
1. If contact opened → remember current thermostat targets, set to eco
|
||||
2. If contact closed for min_secs → restore previous targets
|
||||
3. If thermostat target changed → update stored previous value
|
||||
"""
|
||||
device_id = evt['device_id']
|
||||
cap = evt['cap']
|
||||
payload = evt['payload']
|
||||
|
||||
# Only process events for devices in our objects
|
||||
target_contacts = desc.objects.get('contacts', [])
|
||||
target_thermostats = desc.objects.get('thermostats', [])
|
||||
|
||||
if cap == 'contact' and device_id in target_contacts:
|
||||
await self._handle_contact_event(evt, desc, ctx)
|
||||
|
||||
elif cap == 'thermostat' and device_id in target_thermostats:
|
||||
await self._handle_thermostat_event(evt, desc, ctx)
|
||||
|
||||
async def _handle_contact_event(
|
||||
self,
|
||||
evt: dict[str, Any],
|
||||
desc: RuleDescriptor,
|
||||
ctx: RuleContext
|
||||
) -> None:
|
||||
"""Handle contact sensor state change."""
|
||||
device_id = evt['device_id']
|
||||
contact_state = evt['payload'].get('contact') # "open" or "closed"
|
||||
event_ts = evt.get('ts', ctx.now().isoformat())
|
||||
|
||||
if not contact_state:
|
||||
ctx.logger.warning(f"Contact event missing 'contact' field: {evt}")
|
||||
return
|
||||
|
||||
# Store current state and timestamp
|
||||
state_key = f"rule:{desc.id}:contact:{device_id}:state"
|
||||
ts_key = f"rule:{desc.id}:contact:{device_id}:ts"
|
||||
|
||||
await ctx.redis.set(state_key, contact_state)
|
||||
await ctx.redis.set(ts_key, event_ts)
|
||||
|
||||
if contact_state == 'open':
|
||||
await self._on_window_opened(desc, ctx)
|
||||
elif contact_state == 'closed':
|
||||
await self._on_window_closed(desc, ctx)
|
||||
|
||||
async def _on_window_opened(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
||||
"""
|
||||
Window opened - save current temperatures, then set thermostats to eco.
|
||||
|
||||
Important: We must save the current target BEFORE setting to eco,
|
||||
otherwise we'll save the eco temperature instead of the original.
|
||||
"""
|
||||
eco_target = desc.params.get('eco_target', 16.0)
|
||||
target_thermostats = desc.objects.get('thermostats', [])
|
||||
ttl_secs = desc.params.get('previous_target_ttl_secs', 86400)
|
||||
|
||||
ctx.logger.info(
|
||||
f"Rule {desc.id}: Window opened, setting {len(target_thermostats)} "
|
||||
f"thermostats to eco temperature {eco_target}°C"
|
||||
)
|
||||
|
||||
# FIRST: Save current target temperatures as "previous" (before we change them!)
|
||||
for thermo_id in target_thermostats:
|
||||
current_key = f"rule:{desc.id}:thermo:{thermo_id}:current_target"
|
||||
current_temp_str = await ctx.redis.get(current_key)
|
||||
|
||||
if current_temp_str:
|
||||
# Save current as previous (with TTL)
|
||||
prev_key = f"rule:{desc.id}:thermo:{thermo_id}:previous"
|
||||
await ctx.redis.set(prev_key, current_temp_str, ttl_secs=ttl_secs)
|
||||
ctx.logger.debug(
|
||||
f"Saved previous target for {thermo_id}: {current_temp_str}°C"
|
||||
)
|
||||
else:
|
||||
ctx.logger.warning(
|
||||
f"No current target found for {thermo_id}, cannot save previous"
|
||||
)
|
||||
|
||||
# THEN: Set all thermostats to eco temperature
|
||||
for thermo_id in target_thermostats:
|
||||
try:
|
||||
await ctx.mqtt.publish_set_thermostat(thermo_id, eco_target)
|
||||
ctx.logger.debug(f"Set {thermo_id} to {eco_target}°C")
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"Failed to set {thermo_id}: {e}")
|
||||
|
||||
async def _on_window_closed(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
||||
"""
|
||||
Window closed - restore previous temperatures.
|
||||
|
||||
Note: This is simplified. A production implementation would check
|
||||
close_min_secs and use a timer/scheduler.
|
||||
"""
|
||||
target_thermostats = desc.objects.get('thermostats', [])
|
||||
|
||||
ctx.logger.info(
|
||||
f"Rule {desc.id}: Window closed, restoring {len(target_thermostats)} "
|
||||
f"thermostats to previous temperatures"
|
||||
)
|
||||
|
||||
# Restore previous temperatures
|
||||
for thermo_id in target_thermostats:
|
||||
prev_key = f"rule:{desc.id}:thermo:{thermo_id}:previous"
|
||||
prev_temp_str = await ctx.redis.get(prev_key)
|
||||
|
||||
if prev_temp_str:
|
||||
try:
|
||||
prev_temp = float(prev_temp_str)
|
||||
await ctx.mqtt.publish_set_thermostat(thermo_id, prev_temp)
|
||||
ctx.logger.debug(f"Restored {thermo_id} to {prev_temp}°C")
|
||||
|
||||
# Delete the previous key after restoring
|
||||
await ctx.redis.delete(prev_key)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"Failed to restore {thermo_id}: {e}")
|
||||
else:
|
||||
ctx.logger.warning(
|
||||
f"No previous target found for {thermo_id}, cannot restore"
|
||||
)
|
||||
|
||||
async def _handle_thermostat_event(
|
||||
self,
|
||||
evt: dict[str, Any],
|
||||
desc: RuleDescriptor,
|
||||
ctx: RuleContext
|
||||
) -> None:
|
||||
"""
|
||||
Handle thermostat state change - track current target temperature.
|
||||
|
||||
This keeps a record of the thermostat's current target, so we can
|
||||
save it as "previous" when a window opens.
|
||||
|
||||
Important: We store in "current_target", NOT "previous". The "previous"
|
||||
key is only written when a window opens, to avoid race conditions.
|
||||
"""
|
||||
device_id = evt['device_id']
|
||||
payload = evt['payload']
|
||||
current_target = payload.get('target')
|
||||
|
||||
if current_target is None:
|
||||
return # No target in this state update
|
||||
|
||||
# Store current target (always update, even if it's the eco temperature)
|
||||
current_key = f"rule:{desc.id}:thermo:{device_id}:current_target"
|
||||
ttl_secs = desc.params.get('previous_target_ttl_secs', 86400)
|
||||
|
||||
await ctx.redis.set(current_key, str(current_target), ttl_secs=ttl_secs)
|
||||
|
||||
ctx.logger.debug(
|
||||
f"Rule {desc.id}: Updated current target for {device_id}: {current_target}°C"
|
||||
)
|
||||
|
||||
|
||||
# Rule registry - maps rule type to implementation class
|
||||
RULE_IMPLEMENTATIONS = {
|
||||
'window_setback@1.0': WindowSetbackRule,
|
||||
}
|
||||
@@ -1,83 +1,374 @@
|
||||
"""Rules main entry point."""
|
||||
"""
|
||||
Rules Engine
|
||||
|
||||
Loads rules configuration, subscribes to MQTT events, and dispatches events
|
||||
to registered rule implementations.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from typing import NoReturn
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apps.rules.rules_config import load_rules_config
|
||||
from apps.rules.rule_interface import (
|
||||
RuleDescriptor,
|
||||
RuleContext,
|
||||
MQTTClient,
|
||||
RedisState,
|
||||
load_rule
|
||||
)
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global scheduler instance
|
||||
scheduler: BackgroundScheduler | None = None
|
||||
|
||||
|
||||
def rule_tick() -> None:
|
||||
"""Example job that runs every minute.
|
||||
|
||||
This is a placeholder for actual rule evaluation logic.
|
||||
class RuleEngine:
|
||||
"""
|
||||
Rule engine that loads rules, subscribes to MQTT events,
|
||||
and dispatches them to registered rule implementations.
|
||||
"""
|
||||
logger.info("Rule tick")
|
||||
|
||||
|
||||
def shutdown_handler(signum: int, frame: object) -> NoReturn:
|
||||
"""Handle shutdown signals gracefully.
|
||||
def __init__(
|
||||
self,
|
||||
rules_config_path: str,
|
||||
mqtt_broker: str,
|
||||
mqtt_port: int,
|
||||
redis_url: str
|
||||
):
|
||||
"""
|
||||
Initialize rule engine.
|
||||
|
||||
Args:
|
||||
signum: Signal number
|
||||
frame: Current stack frame
|
||||
rules_config_path: Path to rules.yaml
|
||||
mqtt_broker: MQTT broker hostname/IP
|
||||
mqtt_port: MQTT broker port
|
||||
redis_url: Redis connection URL
|
||||
"""
|
||||
logger.info(f"Received signal {signum}, shutting down...")
|
||||
if scheduler:
|
||||
scheduler.shutdown(wait=True)
|
||||
logger.info("Scheduler stopped")
|
||||
sys.exit(0)
|
||||
self.rules_config_path = rules_config_path
|
||||
self.mqtt_broker = mqtt_broker
|
||||
self.mqtt_port = mqtt_port
|
||||
self.redis_url = redis_url
|
||||
|
||||
# Will be initialized in setup()
|
||||
self.rule_descriptors: list[RuleDescriptor] = []
|
||||
self.rules: dict[str, Any] = {} # rule_id -> Rule instance
|
||||
self.mqtt_client: MQTTClient | None = None
|
||||
self.redis_state: RedisState | None = None
|
||||
self.context: RuleContext | None = None
|
||||
self._mqtt_topics: list[str] = [] # Topics to subscribe to
|
||||
|
||||
# For graceful shutdown
|
||||
self._shutdown_event = asyncio.Event()
|
||||
|
||||
async def setup(self) -> None:
|
||||
"""
|
||||
Load configuration and instantiate rules.
|
||||
|
||||
Raises:
|
||||
ImportError: If rule implementation not found
|
||||
ValueError: If configuration is invalid
|
||||
"""
|
||||
logger.info(f"Loading rules configuration from {self.rules_config_path}")
|
||||
|
||||
# Load rules configuration
|
||||
config = load_rules_config(self.rules_config_path)
|
||||
self.rule_descriptors = config.rules
|
||||
|
||||
logger.info(f"Loaded {len(self.rule_descriptors)} rule(s) from configuration")
|
||||
|
||||
# Instantiate each rule
|
||||
for desc in self.rule_descriptors:
|
||||
if not desc.enabled:
|
||||
logger.info(f" - {desc.id} (type: {desc.type}) [DISABLED]")
|
||||
continue
|
||||
|
||||
try:
|
||||
rule_instance = load_rule(desc)
|
||||
self.rules[desc.id] = rule_instance
|
||||
logger.info(f" - {desc.id} (type: {desc.type})")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load rule {desc.id} (type: {desc.type}): {e}")
|
||||
raise
|
||||
|
||||
enabled_count = len(self.rules)
|
||||
total_count = len(self.rule_descriptors)
|
||||
disabled_count = total_count - enabled_count
|
||||
logger.info(f"Successfully loaded {enabled_count} rule implementation(s) ({disabled_count} disabled)")
|
||||
|
||||
# Call setup on each rule for validation
|
||||
for rule_id, rule_instance in self.rules.items():
|
||||
desc = next((d for d in self.rule_descriptors if d.id == rule_id), None)
|
||||
if desc:
|
||||
try:
|
||||
ctx = RuleContext(
|
||||
logger=logger,
|
||||
mqtt_publisher=self.mqtt_client,
|
||||
redis_state=self.redis_state
|
||||
)
|
||||
await rule_instance.setup(desc, ctx)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to setup rule {rule_id}: {e}")
|
||||
raise
|
||||
|
||||
# Collect MQTT subscriptions from all enabled rules
|
||||
all_topics = set()
|
||||
for rule_id, rule_instance in self.rules.items():
|
||||
desc = next((d for d in self.rule_descriptors if d.id == rule_id), None)
|
||||
if desc:
|
||||
try:
|
||||
topics = rule_instance.get_subscriptions(desc)
|
||||
all_topics.update(topics)
|
||||
logger.debug(f"Rule {rule_id} subscribes to {len(topics)} topic(s)")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get subscriptions for rule {rule_id}: {e}")
|
||||
raise
|
||||
|
||||
logger.info(f"Total MQTT subscriptions needed: {len(all_topics)}")
|
||||
|
||||
# Create unique client ID to avoid conflicts
|
||||
import uuid
|
||||
import os
|
||||
|
||||
client_id_base = "rule_engine"
|
||||
client_suffix = os.environ.get("MQTT_CLIENT_ID_SUFFIX") or uuid.uuid4().hex[:6]
|
||||
unique_client_id = f"{client_id_base}-{client_suffix}"
|
||||
|
||||
# Initialize MQTT client
|
||||
self.mqtt_client = MQTTClient(
|
||||
broker=self.mqtt_broker,
|
||||
port=self.mqtt_port,
|
||||
client_id=unique_client_id
|
||||
)
|
||||
self.mqtt_client.set_logger(logger)
|
||||
|
||||
# Store topics for connection
|
||||
self._mqtt_topics = list(all_topics)
|
||||
|
||||
# Initialize Redis state
|
||||
self.redis_state = RedisState(self.redis_url)
|
||||
|
||||
# Create MQTT publisher wrapper for RuleContext
|
||||
from apps.rules.rule_interface import MQTTPublisher
|
||||
mqtt_publisher = MQTTPublisher(mqtt_client=self.mqtt_client)
|
||||
|
||||
# Create rule context
|
||||
self.context = RuleContext(
|
||||
logger=logger,
|
||||
mqtt_publisher=mqtt_publisher,
|
||||
redis_state=self.redis_state,
|
||||
now_fn=datetime.now
|
||||
)
|
||||
|
||||
def _filter_rules_for_event(self, event: dict[str, Any]) -> list[tuple[str, RuleDescriptor]]:
|
||||
"""
|
||||
Filter rules that should receive this event.
|
||||
|
||||
Rules match if the event's device_id is in the rule's objects.
|
||||
|
||||
Args:
|
||||
event: Normalized MQTT event
|
||||
|
||||
Returns:
|
||||
List of (rule_id, descriptor) tuples that should process this event
|
||||
"""
|
||||
matching_rules = []
|
||||
device_id = event.get('device_id')
|
||||
cap = event.get('cap')
|
||||
|
||||
if not device_id or not cap:
|
||||
return matching_rules
|
||||
|
||||
logger.debug(f"Filtering for cap={cap}, device_id={device_id}")
|
||||
|
||||
# Only check enabled rules (rules in self.rules dict)
|
||||
for rule_id, rule_instance in self.rules.items():
|
||||
desc = next((d for d in self.rule_descriptors if d.id == rule_id), None)
|
||||
if not desc:
|
||||
continue
|
||||
|
||||
objects = desc.objects
|
||||
|
||||
# Check if this device is in the rule's objects
|
||||
matched = False
|
||||
|
||||
if cap == 'contact' and objects.get('contacts'):
|
||||
logger.debug(f"Rule {rule_id}: checking contacts {objects.get('contacts')}")
|
||||
if device_id in objects.get('contacts', []):
|
||||
matched = True
|
||||
|
||||
elif cap == 'thermostat' and objects.get('thermostats'):
|
||||
logger.debug(f"Rule {rule_id}: checking thermostats {objects.get('thermostats')}")
|
||||
if device_id in objects.get('thermostats', []):
|
||||
matched = True
|
||||
|
||||
elif cap == 'light' and objects.get('lights'):
|
||||
logger.debug(f"Rule {rule_id}: checking lights {objects.get('lights')}")
|
||||
if device_id in objects.get('lights', []):
|
||||
matched = True
|
||||
|
||||
elif cap == 'relay' and objects.get('relays'):
|
||||
logger.debug(f"Rule {rule_id}: checking relays {objects.get('relays')}")
|
||||
if device_id in objects.get('relays', []):
|
||||
matched = True
|
||||
|
||||
if matched:
|
||||
matching_rules.append((rule_id, desc))
|
||||
|
||||
return matching_rules
|
||||
|
||||
async def _dispatch_event(self, event: dict[str, Any]) -> None:
|
||||
"""
|
||||
Dispatch event to matching rules.
|
||||
|
||||
Calls rule.on_event() for each matching rule sequentially
|
||||
to preserve order and avoid race conditions.
|
||||
|
||||
Args:
|
||||
event: Normalized MQTT event
|
||||
"""
|
||||
# Debug logging
|
||||
logger.debug(f"Received event: {event}")
|
||||
|
||||
matching_rules = self._filter_rules_for_event(event)
|
||||
|
||||
if not matching_rules:
|
||||
# No rules interested in this event
|
||||
logger.debug(f"No matching rules for {event.get('cap')}/{event.get('device_id')}")
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"Event {event['cap']}/{event['device_id']}: "
|
||||
f"{len(matching_rules)} matching rule(s)"
|
||||
)
|
||||
|
||||
# Process rules sequentially to preserve order
|
||||
for rule_id, desc in matching_rules:
|
||||
rule = self.rules.get(rule_id)
|
||||
if not rule:
|
||||
logger.warning(f"Rule instance not found for {rule_id}")
|
||||
continue
|
||||
|
||||
try:
|
||||
await rule.on_event(event, desc, self.context)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error in rule {rule_id} processing event "
|
||||
f"{event['cap']}/{event['device_id']}: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
# Continue with other rules
|
||||
|
||||
async def run(self) -> None:
|
||||
"""
|
||||
Main event loop - subscribe to MQTT and process events.
|
||||
|
||||
Runs until shutdown signal received.
|
||||
"""
|
||||
logger.info("Starting event processing loop")
|
||||
|
||||
try:
|
||||
async for event in self.mqtt_client.connect(topics=self._mqtt_topics):
|
||||
# Check for shutdown
|
||||
if self._shutdown_event.is_set():
|
||||
logger.info("Shutdown signal received, stopping event loop")
|
||||
break
|
||||
|
||||
# Dispatch event to matching rules
|
||||
await self._dispatch_event(event)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Event loop cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error in event loop: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""Graceful shutdown - close connections."""
|
||||
logger.info("Shutting down rule engine...")
|
||||
self._shutdown_event.set()
|
||||
|
||||
if self.redis_state:
|
||||
await self.redis_state.close()
|
||||
logger.info("Redis connection closed")
|
||||
|
||||
logger.info("Shutdown complete")
|
||||
|
||||
|
||||
async def main_async() -> None:
|
||||
"""Async main function."""
|
||||
# Read configuration from environment
|
||||
rules_config = os.getenv('RULES_CONFIG', 'config/rules.yaml')
|
||||
mqtt_broker = os.getenv('MQTT_BROKER', '172.16.2.16')
|
||||
mqtt_port = int(os.getenv('MQTT_PORT', '1883'))
|
||||
redis_host = os.getenv('REDIS_HOST', '172.23.1.116')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB', '8'))
|
||||
redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("Rules Engine Starting")
|
||||
logger.info("=" * 60)
|
||||
logger.info(f"Config: {rules_config}")
|
||||
logger.info(f"MQTT: {mqtt_broker}:{mqtt_port}")
|
||||
logger.info(f"Redis: {redis_url}")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# Initialize engine
|
||||
engine = RuleEngine(
|
||||
rules_config_path=rules_config,
|
||||
mqtt_broker=mqtt_broker,
|
||||
mqtt_port=mqtt_port,
|
||||
redis_url=redis_url
|
||||
)
|
||||
|
||||
# Load rules
|
||||
try:
|
||||
await engine.setup()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to setup engine: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Setup signal handlers for graceful shutdown
|
||||
loop = asyncio.get_running_loop()
|
||||
main_task = None
|
||||
|
||||
def signal_handler():
|
||||
logger.info("Received shutdown signal")
|
||||
engine._shutdown_event.set()
|
||||
if main_task and not main_task.done():
|
||||
main_task.cancel()
|
||||
|
||||
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||
loop.add_signal_handler(sig, signal_handler)
|
||||
|
||||
# Run engine
|
||||
try:
|
||||
main_task = asyncio.create_task(engine.run())
|
||||
await main_task
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Main task cancelled")
|
||||
finally:
|
||||
await engine.shutdown()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run the rules application."""
|
||||
global scheduler
|
||||
|
||||
logger.info("Rules engine starting...")
|
||||
|
||||
# Register signal handlers
|
||||
signal.signal(signal.SIGINT, shutdown_handler)
|
||||
signal.signal(signal.SIGTERM, shutdown_handler)
|
||||
|
||||
# Initialize scheduler
|
||||
scheduler = BackgroundScheduler()
|
||||
|
||||
# Add example job - runs every minute
|
||||
scheduler.add_job(
|
||||
rule_tick,
|
||||
'interval',
|
||||
minutes=1,
|
||||
id='rule_tick',
|
||||
name='Rule Tick Job'
|
||||
)
|
||||
|
||||
# Start scheduler
|
||||
scheduler.start()
|
||||
logger.info("Scheduler started with rule_tick job (every 1 minute)")
|
||||
|
||||
# Run initial tick immediately
|
||||
rule_tick()
|
||||
|
||||
# Keep the application running
|
||||
"""Entry point for rule engine."""
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
asyncio.run(main_async())
|
||||
except KeyboardInterrupt:
|
||||
logger.info("KeyboardInterrupt received, shutting down...")
|
||||
scheduler.shutdown(wait=True)
|
||||
logger.info("Scheduler stopped")
|
||||
logger.info("Keyboard interrupt received")
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
5
apps/rules/requirements.txt
Normal file
5
apps/rules/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
# Rules Engine Dependencies
|
||||
pydantic>=2.0
|
||||
redis>=5.0.1
|
||||
aiomqtt>=2.0.1
|
||||
pyyaml>=6.0.1
|
||||
742
apps/rules/rule_interface.py
Normal file
742
apps/rules/rule_interface.py
Normal file
@@ -0,0 +1,742 @@
|
||||
"""
|
||||
Rule Interface and Context Objects
|
||||
|
||||
Provides the core abstractions for implementing automation rules:
|
||||
- RuleDescriptor: Configuration data for a rule instance
|
||||
- RedisState: State persistence interface
|
||||
- RuleContext: Runtime context provided to rules
|
||||
- Rule: Abstract base class for all rule implementations
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from typing import Any, Awaitable, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class RuleDescriptor(BaseModel):
|
||||
"""
|
||||
Configuration descriptor for a rule instance.
|
||||
|
||||
This is the validated representation of a rule from rules.yaml.
|
||||
The engine loads these and passes them to rule implementations.
|
||||
|
||||
The 'objects' field is intentionally flexible (dict) to allow different
|
||||
rule types to define their own object structures.
|
||||
"""
|
||||
|
||||
id: str = Field(..., description="Unique identifier for this rule instance")
|
||||
name: Optional[str] = Field(None, description="Optional human-readable name")
|
||||
type: str = Field(..., description="Rule type with version (e.g., 'window_setback@1.0')")
|
||||
enabled: bool = Field(default=True, description="Whether this rule is enabled")
|
||||
objects: dict[str, Any] = Field(
|
||||
default_factory=dict,
|
||||
description="Objects this rule monitors or controls (structure varies by rule type)"
|
||||
)
|
||||
params: dict[str, Any] = Field(
|
||||
default_factory=dict,
|
||||
description="Rule-specific parameters"
|
||||
)
|
||||
|
||||
|
||||
class RedisState:
|
||||
"""
|
||||
Async Redis-backed state persistence for rules with automatic reconnection.
|
||||
|
||||
Provides a simple key-value and hash storage interface for rules to persist
|
||||
state across restarts. All operations are asynchronous and include retry logic
|
||||
for robustness against temporary Redis outages.
|
||||
|
||||
Key Convention:
|
||||
- Callers should use keys like: f"rules:{rule_id}:contact:{device_id}"
|
||||
- This class does NOT enforce key prefixes - caller controls the full key
|
||||
"""
|
||||
|
||||
def __init__(self, url: str, max_retries: int = 3, retry_delay: float = 0.5):
|
||||
"""
|
||||
Initialize RedisState with connection URL.
|
||||
|
||||
Args:
|
||||
url: Redis connection URL (e.g., 'redis://172.23.1.116:6379/8')
|
||||
max_retries: Maximum number of retry attempts for operations (default: 3)
|
||||
retry_delay: Initial delay between retries in seconds, uses exponential backoff (default: 0.5)
|
||||
|
||||
Note:
|
||||
Connection is lazy - actual connection happens on first operation.
|
||||
Uses connection pooling with automatic reconnection on failure.
|
||||
"""
|
||||
self._url = url
|
||||
self._max_retries = max_retries
|
||||
self._retry_delay = retry_delay
|
||||
self._redis: Optional[Any] = None # redis.asyncio.Redis instance
|
||||
|
||||
async def _get_client(self):
|
||||
"""
|
||||
Get or create Redis client with connection pool.
|
||||
|
||||
Lazy initialization ensures we don't connect until first use.
|
||||
Uses decode_responses=True for automatic UTF-8 decoding.
|
||||
"""
|
||||
if self._redis is None:
|
||||
import redis.asyncio as aioredis
|
||||
self._redis = await aioredis.from_url(
|
||||
self._url,
|
||||
decode_responses=True, # Automatic UTF-8 decode
|
||||
encoding='utf-8',
|
||||
max_connections=10, # Connection pool size
|
||||
socket_connect_timeout=5,
|
||||
socket_keepalive=True,
|
||||
health_check_interval=30 # Auto-check connection health
|
||||
)
|
||||
return self._redis
|
||||
|
||||
async def _execute_with_retry(self, operation, *args, **kwargs):
|
||||
"""
|
||||
Execute Redis operation with exponential backoff retry.
|
||||
|
||||
Handles temporary connection failures gracefully by retrying
|
||||
with exponential backoff. On permanent failure, raises the
|
||||
original exception.
|
||||
|
||||
Args:
|
||||
operation: Async callable (Redis method)
|
||||
*args, **kwargs: Arguments to pass to operation
|
||||
|
||||
Returns:
|
||||
Result of the operation
|
||||
|
||||
Raises:
|
||||
Exception: If all retries are exhausted
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
last_exception = None
|
||||
for attempt in range(self._max_retries):
|
||||
try:
|
||||
client = await self._get_client()
|
||||
return await operation(client, *args, **kwargs)
|
||||
except Exception as e:
|
||||
last_exception = e
|
||||
if attempt < self._max_retries - 1:
|
||||
# Exponential backoff: 0.5s, 1s, 2s, ...
|
||||
delay = self._retry_delay * (2 ** attempt)
|
||||
await asyncio.sleep(delay)
|
||||
# Reset client to force reconnection
|
||||
if self._redis:
|
||||
try:
|
||||
await self._redis.close()
|
||||
except:
|
||||
pass
|
||||
self._redis = None
|
||||
|
||||
# All retries exhausted
|
||||
raise last_exception
|
||||
|
||||
# JSON helpers for complex data structures
|
||||
def _dumps(self, obj: Any) -> str:
|
||||
"""Serialize Python object to JSON string."""
|
||||
import json
|
||||
return json.dumps(obj, ensure_ascii=False)
|
||||
|
||||
def _loads(self, s: str) -> Any:
|
||||
"""Deserialize JSON string to Python object."""
|
||||
import json
|
||||
return json.loads(s)
|
||||
|
||||
async def get(self, key: str) -> Optional[str]:
|
||||
"""
|
||||
Get a string value by key.
|
||||
|
||||
Args:
|
||||
key: Redis key (e.g., "rules:my_rule:contact:sensor_1")
|
||||
|
||||
Returns:
|
||||
String value or None if key doesn't exist
|
||||
|
||||
Example:
|
||||
>>> state = RedisState("redis://localhost:6379/0")
|
||||
>>> await state.set("rules:r1:temp", "22.5")
|
||||
>>> temp = await state.get("rules:r1:temp")
|
||||
>>> print(temp) # "22.5"
|
||||
"""
|
||||
async def _get(client, k):
|
||||
return await client.get(k)
|
||||
|
||||
return await self._execute_with_retry(_get, key)
|
||||
|
||||
async def set(self, key: str, value: str, ttl_secs: Optional[int] = None) -> None:
|
||||
"""
|
||||
Set a string value with optional TTL.
|
||||
|
||||
Args:
|
||||
key: Redis key
|
||||
value: String value to store
|
||||
ttl_secs: Optional time-to-live in seconds. If None, key persists indefinitely.
|
||||
|
||||
Example:
|
||||
>>> state = RedisState("redis://localhost:6379/0")
|
||||
>>> # Store with 1 hour TTL
|
||||
>>> await state.set("rules:r1:previous_temp", "20.0", ttl_secs=3600)
|
||||
"""
|
||||
async def _set(client, k, v, ttl):
|
||||
if ttl is not None:
|
||||
await client.setex(k, ttl, v)
|
||||
else:
|
||||
await client.set(k, v)
|
||||
|
||||
await self._execute_with_retry(_set, key, value, ttl_secs)
|
||||
|
||||
async def hget(self, key: str, field: str) -> Optional[str]:
|
||||
"""
|
||||
Get a hash field value.
|
||||
|
||||
Args:
|
||||
key: Redis hash key
|
||||
field: Field name within the hash
|
||||
|
||||
Returns:
|
||||
String value or None if field doesn't exist
|
||||
|
||||
Example:
|
||||
>>> state = RedisState("redis://localhost:6379/0")
|
||||
>>> await state.hset("rules:r1:device_states", "sensor_1", "open")
|
||||
>>> value = await state.hget("rules:r1:device_states", "sensor_1")
|
||||
>>> print(value) # "open"
|
||||
"""
|
||||
async def _hget(client, k, f):
|
||||
return await client.hget(k, f)
|
||||
|
||||
return await self._execute_with_retry(_hget, key, field)
|
||||
|
||||
async def hset(self, key: str, field: str, value: str) -> None:
|
||||
"""
|
||||
Set a hash field value.
|
||||
|
||||
Args:
|
||||
key: Redis hash key
|
||||
field: Field name within the hash
|
||||
value: String value to store
|
||||
|
||||
Example:
|
||||
>>> state = RedisState("redis://localhost:6379/0")
|
||||
>>> await state.hset("rules:r1:sensors", "bedroom", "open")
|
||||
>>> await state.hset("rules:r1:sensors", "kitchen", "closed")
|
||||
"""
|
||||
async def _hset(client, k, f, v):
|
||||
await client.hset(k, f, v)
|
||||
|
||||
await self._execute_with_retry(_hset, key, field, value)
|
||||
|
||||
async def expire(self, key: str, ttl_secs: int) -> None:
|
||||
"""
|
||||
Set or update TTL on an existing key.
|
||||
|
||||
Args:
|
||||
key: Redis key
|
||||
ttl_secs: Time-to-live in seconds
|
||||
|
||||
Example:
|
||||
>>> state = RedisState("redis://localhost:6379/0")
|
||||
>>> await state.set("rules:r1:temp", "22.5")
|
||||
>>> await state.expire("rules:r1:temp", 3600) # Expire in 1 hour
|
||||
"""
|
||||
async def _expire(client, k, ttl):
|
||||
await client.expire(k, ttl)
|
||||
|
||||
await self._execute_with_retry(_expire, key, ttl_secs)
|
||||
|
||||
async def close(self) -> None:
|
||||
"""
|
||||
Close Redis connection and cleanup resources.
|
||||
|
||||
Should be called when shutting down the application.
|
||||
"""
|
||||
if self._redis:
|
||||
await self._redis.close()
|
||||
self._redis = None
|
||||
|
||||
|
||||
class MQTTClient:
|
||||
"""
|
||||
Async MQTT client for rule engine with event normalization and publishing.
|
||||
|
||||
Subscribes to device state topics, normalizes events to a consistent format,
|
||||
and provides high-level publishing methods for device commands.
|
||||
|
||||
Event Normalization:
|
||||
All incoming MQTT messages are parsed into a normalized event structure:
|
||||
{
|
||||
"topic": "home/contact/sensor_1/state",
|
||||
"type": "state",
|
||||
"cap": "contact", # Capability type (contact, thermostat, light, etc.)
|
||||
"device_id": "sensor_1",
|
||||
"payload": {"contact": "open"},
|
||||
"ts": "2025-11-11T10:30:45.123456"
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
broker: str,
|
||||
port: int = 1883,
|
||||
client_id: str = "rule_engine",
|
||||
reconnect_interval: int = 5,
|
||||
max_reconnect_delay: int = 300
|
||||
):
|
||||
"""
|
||||
Initialize MQTT client.
|
||||
|
||||
Args:
|
||||
broker: MQTT broker hostname or IP
|
||||
port: MQTT broker port (default: 1883)
|
||||
client_id: Unique client ID for this connection
|
||||
reconnect_interval: Initial reconnect delay in seconds (default: 5)
|
||||
max_reconnect_delay: Maximum reconnect delay in seconds (default: 300)
|
||||
"""
|
||||
self._broker = broker
|
||||
self._port = port
|
||||
self._client_id = client_id
|
||||
self._reconnect_interval = reconnect_interval
|
||||
self._max_reconnect_delay = max_reconnect_delay
|
||||
self._client = None
|
||||
self._logger = None # Set externally
|
||||
|
||||
def set_logger(self, logger):
|
||||
"""Set logger instance for connection status messages."""
|
||||
self._logger = logger
|
||||
|
||||
def _log(self, level: str, msg: str):
|
||||
"""Internal logging helper."""
|
||||
if self._logger:
|
||||
getattr(self._logger, level)(msg)
|
||||
else:
|
||||
print(f"[{level.upper()}] {msg}")
|
||||
|
||||
async def connect(self, topics: list[str] = None):
|
||||
"""
|
||||
Connect to MQTT broker with automatic reconnection.
|
||||
|
||||
This method manages the connection and automatically reconnects
|
||||
with exponential backoff if the connection is lost.
|
||||
|
||||
Args:
|
||||
topics: List of MQTT topics to subscribe to. If None, subscribes to nothing.
|
||||
"""
|
||||
import aiomqtt
|
||||
from aiomqtt import Client
|
||||
|
||||
if topics is None:
|
||||
topics = []
|
||||
|
||||
reconnect_delay = self._reconnect_interval
|
||||
|
||||
while True:
|
||||
try:
|
||||
self._log("info", f"Connecting to MQTT broker {self._broker}:{self._port} (client_id={self._client_id})")
|
||||
|
||||
async with Client(
|
||||
hostname=self._broker,
|
||||
port=self._port,
|
||||
identifier=self._client_id,
|
||||
) as client:
|
||||
self._client = client
|
||||
self._log("info", f"Connected to MQTT broker {self._broker}:{self._port}")
|
||||
|
||||
# Subscribe to provided topics
|
||||
if topics:
|
||||
for topic in topics:
|
||||
await client.subscribe(topic)
|
||||
self._log("info", f"Subscribed to {len(topics)} topic(s): {', '.join(topics[:5])}{'...' if len(topics) > 5 else ''}")
|
||||
|
||||
# Reset reconnect delay on successful connection
|
||||
reconnect_delay = self._reconnect_interval
|
||||
|
||||
# Process messages - this is a generator that yields messages
|
||||
async for message in client.messages:
|
||||
yield self._normalize_event(message)
|
||||
|
||||
except aiomqtt.MqttError as e:
|
||||
self._log("error", f"MQTT connection error: {e}")
|
||||
self._log("info", f"Reconnecting in {reconnect_delay} seconds...")
|
||||
|
||||
import asyncio
|
||||
await asyncio.sleep(reconnect_delay)
|
||||
|
||||
# Exponential backoff
|
||||
reconnect_delay = min(reconnect_delay * 2, self._max_reconnect_delay)
|
||||
|
||||
def _normalize_event(self, message) -> dict[str, Any]:
|
||||
"""
|
||||
Normalize MQTT message to standard event format.
|
||||
|
||||
Parses topic to extract capability type and device_id,
|
||||
adds timestamp, and structures payload.
|
||||
|
||||
Args:
|
||||
message: aiomqtt.Message instance
|
||||
|
||||
Returns:
|
||||
Normalized event dictionary
|
||||
|
||||
Example:
|
||||
Topic: home/contact/sensor_bedroom/state
|
||||
Payload: {"contact": "open"}
|
||||
|
||||
Returns:
|
||||
{
|
||||
"topic": "home/contact/sensor_bedroom/state",
|
||||
"type": "state",
|
||||
"cap": "contact",
|
||||
"device_id": "sensor_bedroom",
|
||||
"payload": {"contact": "open"},
|
||||
"ts": "2025-11-11T10:30:45.123456"
|
||||
}
|
||||
"""
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
topic = str(message.topic)
|
||||
topic_parts = topic.split('/')
|
||||
|
||||
# Parse topic: home/{capability}/{device_id}/state
|
||||
if len(topic_parts) >= 4 and topic_parts[0] == 'home' and topic_parts[3] == 'state':
|
||||
cap = topic_parts[1] # contact, thermostat, light, etc.
|
||||
device_id = topic_parts[2]
|
||||
else:
|
||||
# Fallback for unexpected topic format
|
||||
cap = "unknown"
|
||||
device_id = topic_parts[-2] if len(topic_parts) >= 2 else "unknown"
|
||||
|
||||
# Parse payload
|
||||
try:
|
||||
payload = json.loads(message.payload.decode('utf-8'))
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
payload = {"raw": message.payload.decode('utf-8', errors='replace')}
|
||||
|
||||
# Generate timestamp
|
||||
ts = datetime.now().isoformat()
|
||||
|
||||
return {
|
||||
"topic": topic,
|
||||
"type": "state",
|
||||
"cap": cap,
|
||||
"device_id": device_id,
|
||||
"payload": payload,
|
||||
"ts": ts
|
||||
}
|
||||
|
||||
async def publish_set_thermostat(self, device_id: str, target: float) -> None:
|
||||
"""
|
||||
Publish thermostat target temperature command.
|
||||
|
||||
Publishes to: home/thermostat/{device_id}/set
|
||||
QoS: 1 (at least once delivery)
|
||||
|
||||
Args:
|
||||
device_id: Thermostat device identifier
|
||||
target: Target temperature in degrees Celsius
|
||||
|
||||
Example:
|
||||
>>> mqtt = MQTTClient("172.16.2.16", 1883)
|
||||
>>> await mqtt.publish_set_thermostat("thermostat_wohnzimmer", 22.5)
|
||||
|
||||
Published to: home/thermostat/thermostat_wohnzimmer/set
|
||||
Payload: {"type":"thermostat","payload":{"target":22.5}}
|
||||
"""
|
||||
import json
|
||||
|
||||
if self._client is None:
|
||||
raise RuntimeError("MQTT client not connected. Call connect() first.")
|
||||
|
||||
topic = f"home/thermostat/{device_id}/set"
|
||||
payload = {
|
||||
"type": "thermostat",
|
||||
"payload": {
|
||||
"target": target
|
||||
}
|
||||
}
|
||||
|
||||
payload_str = json.dumps(payload)
|
||||
|
||||
await self._client.publish(
|
||||
topic,
|
||||
payload=payload_str.encode('utf-8'),
|
||||
qos=1 # At least once delivery
|
||||
)
|
||||
|
||||
self._log("debug", f"Published SET to {topic}: {payload_str}")
|
||||
|
||||
|
||||
# Legacy alias for backward compatibility
|
||||
class MQTTPublisher:
|
||||
"""
|
||||
Legacy MQTT publishing interface - DEPRECATED.
|
||||
|
||||
Use MQTTClient instead for new code.
|
||||
This class is kept for backward compatibility with existing documentation.
|
||||
"""
|
||||
|
||||
def __init__(self, mqtt_client):
|
||||
"""
|
||||
Initialize MQTT publisher.
|
||||
|
||||
Args:
|
||||
mqtt_client: MQTTClient instance
|
||||
"""
|
||||
self._mqtt = mqtt_client
|
||||
|
||||
async def publish_set_thermostat(self, device_id: str, target: float) -> None:
|
||||
"""
|
||||
Publish a thermostat target temperature command.
|
||||
|
||||
Args:
|
||||
device_id: Thermostat device identifier
|
||||
target: Target temperature in degrees Celsius
|
||||
"""
|
||||
await self._mqtt.publish_set_thermostat(device_id, target)
|
||||
|
||||
|
||||
class RuleContext:
|
||||
"""
|
||||
Runtime context provided to rules during event processing.
|
||||
|
||||
Contains all external dependencies and utilities a rule needs:
|
||||
- Logger for diagnostics
|
||||
- MQTT client for publishing commands
|
||||
- Redis client for state persistence
|
||||
- Current timestamp function
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
logger,
|
||||
mqtt_publisher: MQTTPublisher,
|
||||
redis_state: RedisState,
|
||||
now_fn=None
|
||||
):
|
||||
"""
|
||||
Initialize rule context.
|
||||
|
||||
Args:
|
||||
logger: Logger instance (e.g., logging.Logger)
|
||||
mqtt_publisher: MQTTPublisher instance for device commands
|
||||
redis_state: RedisState instance for persistence
|
||||
now_fn: Optional callable returning current datetime (defaults to datetime.now)
|
||||
"""
|
||||
self.logger = logger
|
||||
self.mqtt = mqtt_publisher
|
||||
self.redis = redis_state
|
||||
self._now_fn = now_fn or datetime.now
|
||||
|
||||
def now(self) -> datetime:
|
||||
"""
|
||||
Get current timestamp.
|
||||
|
||||
Returns:
|
||||
Current datetime (timezone-aware if now_fn provides it)
|
||||
"""
|
||||
return self._now_fn()
|
||||
|
||||
|
||||
class Rule(ABC):
|
||||
"""
|
||||
Abstract base class for all automation rule implementations.
|
||||
|
||||
Rules implement event-driven automation logic. The engine calls on_event()
|
||||
for each relevant device state change, passing the event data, rule configuration,
|
||||
and runtime context.
|
||||
|
||||
Implementations must be idempotent - processing the same event multiple times
|
||||
should produce the same result.
|
||||
|
||||
Example implementation:
|
||||
|
||||
class WindowSetbackRule(Rule):
|
||||
def get_subscriptions(self, desc: RuleDescriptor) -> list[str]:
|
||||
# Subscribe to contact sensor state topics
|
||||
topics = []
|
||||
for contact_id in desc.objects.contacts or []:
|
||||
topics.append(f"home/contact/{contact_id}/state")
|
||||
return topics
|
||||
|
||||
async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
||||
device_id = evt['device_id']
|
||||
cap = evt['cap']
|
||||
|
||||
if cap == 'contact':
|
||||
contact_state = evt['payload'].get('contact')
|
||||
if contact_state == 'open':
|
||||
# Window opened - set thermostats to eco
|
||||
for thermo_id in desc.objects.thermostats or []:
|
||||
eco_temp = desc.params.get('eco_target', 16.0)
|
||||
await ctx.mqtt.publish_set_thermostat(thermo_id, eco_temp)
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_subscriptions(self, desc: RuleDescriptor) -> list[str]:
|
||||
"""
|
||||
Return list of MQTT topics this rule needs to subscribe to.
|
||||
|
||||
Called once during rule engine setup. The rule examines its configuration
|
||||
(desc.objects) and returns the specific state topics it needs to monitor.
|
||||
|
||||
Args:
|
||||
desc: Rule configuration from rules.yaml
|
||||
|
||||
Returns:
|
||||
List of MQTT topic patterns/strings to subscribe to
|
||||
|
||||
Example:
|
||||
For a window setback rule monitoring 2 contacts:
|
||||
['home/contact/sensor_bedroom/state', 'home/contact/sensor_kitchen/state']
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def on_event(
|
||||
self,
|
||||
evt: dict[str, Any],
|
||||
desc: RuleDescriptor,
|
||||
ctx: RuleContext
|
||||
) -> None:
|
||||
"""
|
||||
Process a device state change event.
|
||||
|
||||
This method is called by the rule engine whenever a device state changes
|
||||
that is relevant to this rule. The implementation should examine the event
|
||||
and take appropriate actions (e.g., publish MQTT commands, update state).
|
||||
|
||||
MUST be idempotent: Processing the same event multiple times should be safe.
|
||||
|
||||
Args:
|
||||
evt: Event dictionary with the following structure:
|
||||
{
|
||||
"topic": "home/contact/device_id/state", # MQTT topic
|
||||
"type": "state", # Message type
|
||||
"cap": "contact", # Capability type
|
||||
"device_id": "kontakt_wohnzimmer", # Device identifier
|
||||
"payload": {"contact": "open"}, # Capability-specific payload
|
||||
"ts": "2025-11-11T10:30:45.123456" # ISO timestamp
|
||||
}
|
||||
|
||||
desc: Rule configuration from rules.yaml
|
||||
|
||||
ctx: Runtime context with logger, MQTT, Redis, and timestamp utilities
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Raises:
|
||||
Exception: Implementation may raise exceptions for errors.
|
||||
The engine will log them but continue processing.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Dynamic Rule Loading
|
||||
# ============================================================================
|
||||
|
||||
import importlib
|
||||
import re
|
||||
from typing import Type
|
||||
|
||||
# Cache for loaded rule classes (per process)
|
||||
_RULE_CLASS_CACHE: dict[str, Type[Rule]] = {}
|
||||
|
||||
|
||||
def load_rule(desc: RuleDescriptor) -> Rule:
|
||||
"""
|
||||
Dynamically load and instantiate a rule based on its type descriptor.
|
||||
|
||||
Convention:
|
||||
- Rule type format: 'name@version' (e.g., 'window_setback@1.0')
|
||||
- Module path: apps.rules.impl.{name}
|
||||
- Class name: PascalCase version of name + 'Rule'
|
||||
Example: 'window_setback' → 'WindowSetbackRule'
|
||||
|
||||
Args:
|
||||
desc: Rule descriptor from rules.yaml
|
||||
|
||||
Returns:
|
||||
Instantiated Rule object
|
||||
|
||||
Raises:
|
||||
ValueError: If type format is invalid
|
||||
ImportError: If rule module cannot be found
|
||||
AttributeError: If rule class cannot be found in module
|
||||
|
||||
Examples:
|
||||
>>> desc = RuleDescriptor(
|
||||
... id="test_rule",
|
||||
... type="window_setback@1.0",
|
||||
... targets={},
|
||||
... params={}
|
||||
... )
|
||||
>>> rule = load_rule(desc)
|
||||
>>> isinstance(rule, Rule)
|
||||
True
|
||||
"""
|
||||
rule_type = desc.type
|
||||
|
||||
# Check cache first
|
||||
if rule_type in _RULE_CLASS_CACHE:
|
||||
rule_class = _RULE_CLASS_CACHE[rule_type]
|
||||
return rule_class()
|
||||
|
||||
# Parse type: 'name@version'
|
||||
if '@' not in rule_type:
|
||||
raise ValueError(
|
||||
f"Invalid rule type '{rule_type}': must be in format 'name@version' "
|
||||
f"(e.g., 'window_setback@1.0')"
|
||||
)
|
||||
|
||||
name, version = rule_type.split('@', 1)
|
||||
|
||||
# Validate name (alphanumeric and underscores only)
|
||||
if not re.match(r'^[a-z][a-z0-9_]*$', name):
|
||||
raise ValueError(
|
||||
f"Invalid rule name '{name}': must start with lowercase letter "
|
||||
f"and contain only lowercase letters, numbers, and underscores"
|
||||
)
|
||||
|
||||
# Convert snake_case to PascalCase for class name
|
||||
# Example: 'window_setback' → 'WindowSetbackRule'
|
||||
class_name = ''.join(word.capitalize() for word in name.split('_')) + 'Rule'
|
||||
|
||||
# Construct module path
|
||||
module_path = f'apps.rules.impl.{name}'
|
||||
|
||||
# Try to import the module
|
||||
try:
|
||||
module = importlib.import_module(module_path)
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
f"Cannot load rule type '{rule_type}': module '{module_path}' not found.\n"
|
||||
f"Hint: Create file 'apps/rules/impl/{name}.py' with class '{class_name}'.\n"
|
||||
f"Original error: {e}"
|
||||
) from e
|
||||
|
||||
# Try to get the class from the module
|
||||
try:
|
||||
rule_class = getattr(module, class_name)
|
||||
except AttributeError as e:
|
||||
raise AttributeError(
|
||||
f"Cannot load rule type '{rule_type}': class '{class_name}' not found in module '{module_path}'.\n"
|
||||
f"Hint: Define 'class {class_name}(Rule):' in 'apps/rules/impl/{name}.py'.\n"
|
||||
f"Available classes in module: {[name for name in dir(module) if not name.startswith('_')]}"
|
||||
) from e
|
||||
|
||||
# Validate that it's a Rule subclass
|
||||
if not issubclass(rule_class, Rule):
|
||||
raise TypeError(
|
||||
f"Class '{class_name}' in '{module_path}' is not a subclass of Rule. "
|
||||
f"Ensure it inherits from apps.rules.rule_interface.Rule"
|
||||
)
|
||||
|
||||
# Cache the class
|
||||
_RULE_CLASS_CACHE[rule_type] = rule_class
|
||||
|
||||
# Instantiate and return
|
||||
return rule_class()
|
||||
122
apps/rules/rules_config.py
Normal file
122
apps/rules/rules_config.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""
|
||||
Rules Configuration Schema and Loader
|
||||
|
||||
Provides Pydantic models for validating rules.yaml configuration.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import yaml
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
|
||||
class Rule(BaseModel):
|
||||
"""Single rule configuration"""
|
||||
id: str = Field(..., description="Unique rule identifier")
|
||||
name: Optional[str] = Field(None, description="Optional human-readable name")
|
||||
type: str = Field(..., description="Rule type (e.g., 'window_setback@1.0')")
|
||||
enabled: bool = Field(default=True, description="Whether this rule is enabled")
|
||||
objects: dict[str, Any] = Field(default_factory=dict, description="Objects this rule monitors or controls")
|
||||
params: dict[str, Any] = Field(default_factory=dict, description="Rule-specific parameters")
|
||||
|
||||
@field_validator('id')
|
||||
@classmethod
|
||||
def validate_id(cls, v: str) -> str:
|
||||
if not v or not v.strip():
|
||||
raise ValueError("Rule ID cannot be empty")
|
||||
return v.strip()
|
||||
|
||||
@field_validator('type')
|
||||
@classmethod
|
||||
def validate_type(cls, v: str) -> str:
|
||||
if not v or not v.strip():
|
||||
raise ValueError("Rule type cannot be empty")
|
||||
if '@' not in v:
|
||||
raise ValueError(f"Rule type must include version (e.g., 'window_setback@1.0'), got: {v}")
|
||||
return v.strip()
|
||||
|
||||
|
||||
class RulesConfig(BaseModel):
|
||||
"""Root configuration object"""
|
||||
rules: list[Rule] = Field(..., description="List of all rules")
|
||||
|
||||
@field_validator('rules')
|
||||
@classmethod
|
||||
def validate_unique_ids(cls, rules: list[Rule]) -> list[Rule]:
|
||||
"""Ensure all rule IDs are unique"""
|
||||
ids = [rule.id for rule in rules]
|
||||
duplicates = [id for id in ids if ids.count(id) > 1]
|
||||
if duplicates:
|
||||
raise ValueError(f"Duplicate rule IDs found: {set(duplicates)}")
|
||||
return rules
|
||||
|
||||
|
||||
def load_rules_config(config_path: str | Path = "config/rules.yaml") -> RulesConfig:
|
||||
"""
|
||||
Load and validate rules configuration from YAML file.
|
||||
|
||||
Args:
|
||||
config_path: Path to rules.yaml file
|
||||
|
||||
Returns:
|
||||
Validated RulesConfig object
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If config file doesn't exist
|
||||
ValueError: If YAML is invalid or validation fails
|
||||
"""
|
||||
config_path = Path(config_path)
|
||||
|
||||
if not config_path.exists():
|
||||
raise FileNotFoundError(f"Rules configuration not found: {config_path}")
|
||||
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
try:
|
||||
data = yaml.safe_load(f)
|
||||
except yaml.YAMLError as e:
|
||||
raise ValueError(f"Invalid YAML in {config_path}: {e}") from e
|
||||
|
||||
if not data:
|
||||
raise ValueError(f"Empty configuration file: {config_path}")
|
||||
|
||||
if 'rules' not in data:
|
||||
raise ValueError(
|
||||
f"Missing 'rules:' key in {config_path}. "
|
||||
"Configuration must start with 'rules:' followed by a list of rule definitions."
|
||||
)
|
||||
|
||||
try:
|
||||
return RulesConfig(**data)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Configuration validation failed: {e}") from e
|
||||
|
||||
|
||||
def get_rule_by_id(config: RulesConfig, rule_id: str) -> Rule | None:
|
||||
"""Get a specific rule by ID"""
|
||||
for rule in config.rules:
|
||||
if rule.id == rule_id:
|
||||
return rule
|
||||
return None
|
||||
|
||||
|
||||
def get_rules_by_type(config: RulesConfig, rule_type: str) -> list[Rule]:
|
||||
"""Get all rules of a specific type"""
|
||||
return [rule for rule in config.rules if rule.type == rule_type]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test configuration loading
|
||||
try:
|
||||
config = load_rules_config()
|
||||
print(f"✅ Loaded {len(config.rules)} rules:")
|
||||
for rule in config.rules:
|
||||
name = f" ({rule.name})" if rule.name else ""
|
||||
enabled = "✓" if rule.enabled else "✗"
|
||||
print(f" [{enabled}] {rule.id}{name}: {rule.type}")
|
||||
if rule.objects:
|
||||
obj_summary = ", ".join(f"{k}: {len(v) if isinstance(v, list) else v}"
|
||||
for k, v in rule.objects.items())
|
||||
print(f" Objects: {obj_summary}")
|
||||
except Exception as e:
|
||||
print(f"❌ Configuration error: {e}")
|
||||
@@ -59,13 +59,9 @@ docker run --rm -p 8010:8010 \
|
||||
simulator:dev
|
||||
```
|
||||
|
||||
**Mit Docker Network (optional):**
|
||||
```bash
|
||||
docker run --rm -p 8010:8010 \
|
||||
--name simulator \
|
||||
-e MQTT_BROKER=172.23.1.102 \
|
||||
simulator:dev
|
||||
```
|
||||
**Note for finch/nerdctl users:**
|
||||
- finch binds ports to `127.0.0.1` by default
|
||||
- The web interface will be accessible at `http://127.0.0.1:8010`
|
||||
|
||||
#### Environment Variables
|
||||
|
||||
|
||||
@@ -37,17 +37,6 @@ docker build -t ui:dev -f apps/ui/Dockerfile .
|
||||
|
||||
#### Run Container
|
||||
|
||||
**Linux Server (empfohlen):**
|
||||
```bash
|
||||
# Mit Docker Network für Container-to-Container Kommunikation
|
||||
docker run --rm -p 8002:8002 \
|
||||
-e UI_PORT=8002 \
|
||||
-e API_BASE=http://172.19.1.11:8001 \
|
||||
-e BASE_PATH=/ \
|
||||
ui:dev
|
||||
```
|
||||
|
||||
**macOS mit finch/nerdctl:**
|
||||
```bash
|
||||
docker run --rm -p 8002:8002 \
|
||||
--add-host=host.docker.internal:host-gateway \
|
||||
@@ -57,11 +46,10 @@ docker run --rm -p 8002:8002 \
|
||||
ui:dev
|
||||
```
|
||||
|
||||
**Hinweise:**
|
||||
- **Linux**: Verwende Docker Network und Service-Namen (`http://api:8001`)
|
||||
- **macOS/finch**: Verwende `host.docker.internal` mit `--add-host` flag
|
||||
- Die UI macht Server-Side API-Aufrufe beim Rendern der Seite
|
||||
- Browser-seitige Realtime-Updates (SSE) gehen direkt vom Browser zur API
|
||||
**Note for finch/nerdctl users:**
|
||||
- finch binds ports to `127.0.0.1` by default (not `0.0.0.0`)
|
||||
- Use `--add-host=host.docker.internal:host-gateway` to allow container-to-host communication
|
||||
- Set `API_BASE=http://host.docker.internal:8001` to reach the API container
|
||||
|
||||
#### Environment Variables
|
||||
|
||||
|
||||
94
config/rules.yaml
Normal file
94
config/rules.yaml
Normal file
@@ -0,0 +1,94 @@
|
||||
# Rules Configuration
|
||||
# Auto-generated from devices.yaml
|
||||
|
||||
rules:
|
||||
- id: window_setback_esszimmer
|
||||
enabled: false
|
||||
name: Fensterabsenkung Esszimmer
|
||||
type: window_setback@1.0
|
||||
objects:
|
||||
contacts:
|
||||
- kontakt_esszimmer_strasse_links
|
||||
- kontakt_esszimmer_strasse_rechts
|
||||
thermostats:
|
||||
- thermostat_esszimmer
|
||||
params:
|
||||
eco_target: 16.0
|
||||
open_min_secs: 20
|
||||
close_min_secs: 20
|
||||
previous_target_ttl_secs: 86400
|
||||
- id: window_setback_kueche
|
||||
enabled: false
|
||||
name: Fensterabsenkung Küche
|
||||
type: window_setback@1.0
|
||||
objects:
|
||||
contacts:
|
||||
- kontakt_kueche_garten_fenster
|
||||
- kontakt_kueche_garten_tuer
|
||||
- kontakt_kueche_strasse_links
|
||||
- kontakt_kueche_strasse_rechts
|
||||
thermostats:
|
||||
- thermostat_kueche
|
||||
params:
|
||||
eco_target: 16.0
|
||||
open_min_secs: 20
|
||||
close_min_secs: 20
|
||||
previous_target_ttl_secs: 86400
|
||||
- id: window_setback_patty
|
||||
enabled: false
|
||||
name: Fensterabsenkung Arbeitszimmer Patty
|
||||
type: window_setback@1.0
|
||||
objects:
|
||||
contacts:
|
||||
- kontakt_patty_garten_links
|
||||
- kontakt_patty_garten_rechts
|
||||
- kontakt_patty_strasse
|
||||
thermostats:
|
||||
- thermostat_patty
|
||||
params:
|
||||
eco_target: 16.0
|
||||
open_min_secs: 20
|
||||
close_min_secs: 20
|
||||
previous_target_ttl_secs: 86400
|
||||
- id: window_setback_schlafzimmer
|
||||
enabled: false
|
||||
name: Fensterabsenkung Schlafzimmer
|
||||
type: window_setback@1.0
|
||||
objects:
|
||||
contacts:
|
||||
- kontakt_schlafzimmer_strasse
|
||||
thermostats:
|
||||
- thermostat_schlafzimmer
|
||||
params:
|
||||
eco_target: 16.0
|
||||
open_min_secs: 20
|
||||
close_min_secs: 20
|
||||
previous_target_ttl_secs: 86400
|
||||
- id: window_setback_wohnzimmer
|
||||
enabled: false
|
||||
name: Fensterabsenkung Wohnzimmer
|
||||
type: window_setback@1.0
|
||||
objects:
|
||||
contacts:
|
||||
- kontakt_wohnzimmer_garten_links
|
||||
- kontakt_wohnzimmer_garten_rechts
|
||||
thermostats:
|
||||
- thermostat_wohnzimmer
|
||||
params:
|
||||
eco_target: 16.0
|
||||
open_min_secs: 20
|
||||
close_min_secs: 20
|
||||
previous_target_ttl_secs: 86400
|
||||
- id: window_setback_wolfgang
|
||||
enabled: true
|
||||
name: Fensterabsenkung Arbeitszimmer Wolfgang
|
||||
type: window_setback@1.0
|
||||
objects:
|
||||
contacts:
|
||||
- kontakt_wolfgang_garten
|
||||
thermostats:
|
||||
- thermostat_wolfgang
|
||||
params:
|
||||
eco_target: 16.0
|
||||
open_min_secs: 20
|
||||
close_min_secs: 20
|
||||
62
docker-compose.yaml
Normal file
62
docker-compose.yaml
Normal file
@@ -0,0 +1,62 @@
|
||||
version: "3.9"
|
||||
|
||||
x-environment: &default-env
|
||||
MQTT_BROKER: "172.23.1.102"
|
||||
MQTT_PORT: 1883
|
||||
REDIS_HOST: "172.23.1.116"
|
||||
REDIS_PORT: 6379
|
||||
REDIS_DB: 8
|
||||
|
||||
services:
|
||||
|
||||
ui:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: apps/ui/Dockerfile
|
||||
container_name: ui
|
||||
environment:
|
||||
UI_PORT: 8002
|
||||
API_BASE: "http://172.19.1.11:8001"
|
||||
BASE_PATH: "/"
|
||||
ports:
|
||||
- "8002:8002"
|
||||
depends_on:
|
||||
- api
|
||||
|
||||
api:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: apps/api/Dockerfile
|
||||
container_name: api
|
||||
environment:
|
||||
<<: *default-env
|
||||
REDIS_CHANNEL: "ui:updates"
|
||||
volumes:
|
||||
- ./config:/app/config:ro
|
||||
ports:
|
||||
- "8001:8001"
|
||||
depends_on:
|
||||
- abstraction
|
||||
|
||||
abstraction:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: apps/abstraction/Dockerfile
|
||||
container_name: abstraction
|
||||
environment:
|
||||
<<: *default-env
|
||||
volumes:
|
||||
- ./config:/app/config:ro
|
||||
|
||||
rules:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: apps/rules/Dockerfile
|
||||
container_name: rules
|
||||
environment:
|
||||
<<: *default-env
|
||||
RULES_CONFIG: "/app/config/rules.yaml"
|
||||
volumes:
|
||||
- ./config:/app/config:ro
|
||||
depends_on:
|
||||
- abstraction
|
||||
@@ -1,15 +0,0 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
# Placeholder for future services
|
||||
# Example:
|
||||
# api:
|
||||
# build:
|
||||
# context: ..
|
||||
# dockerfile: apps/api/Dockerfile
|
||||
# ports:
|
||||
# - "8000:8000"
|
||||
|
||||
placeholder:
|
||||
image: alpine:latest
|
||||
command: echo "Docker Compose placeholder - add your services here"
|
||||
Reference in New Issue
Block a user