diff --git a/apps/rules/Dockerfile b/apps/rules/Dockerfile new file mode 100644 index 0000000..78e7c07 --- /dev/null +++ b/apps/rules/Dockerfile @@ -0,0 +1,53 @@ +# Rules Engine Dockerfile +# Event-driven automation rules processor with MQTT and Redis + +FROM python:3.14-alpine + +# Prevent Python from writing .pyc files and enable unbuffered output +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + RULES_CONFIG=config/rules.yaml \ + MQTT_BROKER=172.16.2.16 \ + MQTT_PORT=1883 \ + REDIS_HOST=localhost \ + REDIS_PORT=6379 \ + REDIS_DB=8 \ + LOG_LEVEL=INFO + +# Create non-root user +RUN addgroup -g 10001 -S app && \ + adduser -u 10001 -S app -G app + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apk add --no-cache \ + gcc \ + musl-dev \ + linux-headers + +# Install Python dependencies +COPY apps/rules/requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY apps/__init__.py /app/apps/ +COPY apps/rules/ /app/apps/rules/ +COPY packages/ /app/packages/ +COPY config/ /app/config/ + +# Change ownership to non-root user +RUN chown -R app:app /app + +# Switch to non-root user +USER app + +# Expose no ports (MQTT/Redis client only) + +# Health check (check if process is running) +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD pgrep -f "apps.rules.main" || exit 1 + +# Run the rules engine +CMD ["python", "-m", "apps.rules.main"] diff --git a/apps/rules/RULE_INTERFACE.md b/apps/rules/RULE_INTERFACE.md new file mode 100644 index 0000000..7f7d0ca --- /dev/null +++ b/apps/rules/RULE_INTERFACE.md @@ -0,0 +1,371 @@ +# Rule Interface Documentation + +## Overview + +The rule interface provides a clean abstraction for implementing automation rules. Rules respond to device state changes and can publish commands, persist state, and log diagnostics. + +## Core Components + +### 1. RuleDescriptor + +Configuration data for a rule instance (loaded from `rules.yaml`): + +```python +RuleDescriptor( + id="window_setback_wohnzimmer", # Unique rule ID + name="Fensterabsenkung Wohnzimmer", # Optional display name + type="window_setback@1.0", # Rule type + version + targets={ # Rule-specific targets + "rooms": ["Wohnzimmer"], + "contacts": ["kontakt_wohnzimmer_..."], + "thermostats": ["thermostat_wohnzimmer"] + }, + params={ # Rule-specific parameters + "eco_target": 16.0, + "open_min_secs": 20 + } +) +``` + +### 2. RedisState + +Async state persistence with automatic reconnection and retry logic: + +```python +# Initialize (done by rule engine) +redis_state = RedisState("redis://172.23.1.116:6379/8") + +# Simple key-value with TTL +await ctx.redis.set("rules:my_rule:temp", "22.5", ttl_secs=3600) +value = await ctx.redis.get("rules:my_rule:temp") # Returns "22.5" or None + +# Hash storage (for multiple related values) +await ctx.redis.hset("rules:my_rule:sensors", "bedroom", "open") +await ctx.redis.hset("rules:my_rule:sensors", "kitchen", "closed") +value = await ctx.redis.hget("rules:my_rule:sensors", "bedroom") # "open" + +# TTL management +await ctx.redis.expire("rules:my_rule:temp", 7200) # Extend to 2 hours + +# JSON helpers (for complex data) +import json +data = {"temp": 22.5, "humidity": 45} +await ctx.redis.set("rules:my_rule:data", ctx.redis._dumps(data)) +stored = await ctx.redis.get("rules:my_rule:data") +parsed = ctx.redis._loads(stored) if stored else None +``` + +**Key Conventions:** +- Use prefix `rules:{rule_id}:` for all keys +- Example: `rules:window_setback_wohnzimmer:thermo:device_123:previous` +- TTL recommended for temporary state (previous temperatures, timers) + +**Robustness Features:** +- Automatic retry with exponential backoff (default: 3 retries) +- Connection pooling (max 10 connections) +- Automatic reconnection on Redis restart +- Health checks every 30 seconds +- All operations wait and retry, no exceptions on temporary outages + +### 3. MQTTClient + +Async MQTT client with event normalization and command publishing: + +```python +# Initialize (done by rule engine) +mqtt_client = MQTTClient( + broker="172.16.2.16", + port=1883, + client_id="rule_engine" +) + +# Subscribe and receive normalized events +async for event in mqtt_client.connect(): + # Event structure: + # { + # "topic": "home/contact/sensor_1/state", + # "type": "state", + # "cap": "contact", # Capability (contact, thermostat, etc.) + # "device_id": "sensor_1", + # "payload": {"contact": "open"}, + # "ts": "2025-11-11T10:30:45.123456" + # } + + if event['cap'] == 'contact': + handle_contact(event) + elif event['cap'] == 'thermostat': + handle_thermostat(event) + +# Publish commands (within async context) +await mqtt_client.publish_set_thermostat("thermostat_id", 22.5) +``` + +**Subscriptions:** +- `home/contact/+/state` - All contact sensor state changes +- `home/thermostat/+/state` - All thermostat state changes + +**Publishing:** +- Topic: `home/thermostat/{device_id}/set` +- Payload: `{"type":"thermostat","payload":{"target":22.5}}` +- QoS: 1 (at least once delivery) + +**Robustness:** +- Automatic reconnection with exponential backoff +- Connection logging (connect/disconnect events) +- Clean session handling + +### 4. MQTTPublisher (Legacy) + +Simplified wrapper around MQTTClient for backward compatibility: + +```python +# Set thermostat temperature +await ctx.mqtt.publish_set_thermostat("thermostat_wohnzimmer", 21.5) +``` + +### 5. RuleContext + +Runtime context provided to rules: + +```python +class RuleContext: + logger # Logger instance + mqtt # MQTTPublisher + redis # RedisState + now() -> datetime # Current timestamp +``` + +### 5. Rule Abstract Base Class + +All rules extend this: + +```python +class MyRule(Rule): + async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None: + # Event structure: + # { + # "topic": "home/contact/device_id/state", + # "type": "state", + # "cap": "contact", + # "device_id": "kontakt_wohnzimmer", + # "payload": {"contact": "open"}, + # "ts": "2025-11-11T10:30:45.123456" + # } + + device_id = evt['device_id'] + cap = evt['cap'] + + if cap == 'contact': + contact_state = evt['payload'].get('contact') + # ... implement logic +``` + +## Implementing a New Rule + +### Step 1: Create Rule Class + +```python +from packages.rule_interface import Rule, RuleDescriptor, RuleContext +from typing import Any + +class MyCustomRule(Rule): + """My custom automation rule.""" + + async def on_event( + self, + evt: dict[str, Any], + desc: RuleDescriptor, + ctx: RuleContext + ) -> None: + """Process device state changes.""" + + # 1. Extract event data + device_id = evt['device_id'] + cap = evt['cap'] + payload = evt['payload'] + + # 2. Filter to relevant devices + if device_id not in desc.targets.get('my_devices', []): + return + + # 3. Implement logic + if cap == 'contact': + if payload.get('contact') == 'open': + # Do something + await ctx.mqtt.publish_set_thermostat( + 'some_thermostat', + desc.params.get('temp', 20.0) + ) + + # 4. Persist state if needed + state_key = f"rule:{desc.id}:device:{device_id}:state" + await ctx.redis.set(state_key, payload.get('contact')) +``` + +### Step 2: Register in RULE_IMPLEMENTATIONS + +```python +# In your rule module (e.g., my_custom_rule.py) +RULE_IMPLEMENTATIONS = { + 'my_custom@1.0': MyCustomRule, +} +``` + +### Step 3: Configure in rules.yaml + +```yaml +rules: + - id: my_custom_living_room + name: My Custom Rule for Living Room + type: my_custom@1.0 + targets: + my_devices: + - device_1 + - device_2 + params: + temp: 22.0 + duration_secs: 300 +``` + +## Best Practices + +### Idempotency + +Rules MUST be idempotent - processing the same event multiple times should be safe: + +```python +# Good: Idempotent +async def on_event(self, evt, desc, ctx): + if evt['payload'].get('contact') == 'open': + await ctx.mqtt.publish_set_thermostat('thermo', 16.0) + +# Bad: Not idempotent (increments counter) +async def on_event(self, evt, desc, ctx): + counter = await ctx.redis.get('counter') or '0' + await ctx.redis.set('counter', str(int(counter) + 1)) +``` + +### Error Handling + +Handle errors gracefully - the engine will catch and log exceptions: + +```python +async def on_event(self, evt, desc, ctx): + try: + await ctx.mqtt.publish_set_thermostat('thermo', 16.0) + except Exception as e: + ctx.logger.error(f"Failed to set thermostat: {e}") + # Don't raise - let event processing continue +``` + +### State Keys + +Use consistent naming for Redis keys: + +```python +# Pattern: rule:{rule_id}:{category}:{device_id}:{field} +state_key = f"rule:{desc.id}:contact:{device_id}:state" +ts_key = f"rule:{desc.id}:contact:{device_id}:ts" +prev_key = f"rule:{desc.id}:thermo:{device_id}:previous" +``` + +### Logging + +Use appropriate log levels: + +```python +ctx.logger.debug("Detailed diagnostic info") +ctx.logger.info("Normal operation milestones") +ctx.logger.warning("Unexpected but handled situations") +ctx.logger.error("Errors that prevent operation") +``` + +## Event Structure Reference + +### Contact Sensor Event + +```python +{ + "topic": "home/contact/kontakt_wohnzimmer/state", + "type": "state", + "cap": "contact", + "device_id": "kontakt_wohnzimmer", + "payload": { + "contact": "open" # or "closed" + }, + "ts": "2025-11-11T10:30:45.123456" +} +``` + +### Thermostat Event + +```python +{ + "topic": "home/thermostat/thermostat_wohnzimmer/state", + "type": "state", + "cap": "thermostat", + "device_id": "thermostat_wohnzimmer", + "payload": { + "target": 21.0, + "current": 20.5, + "mode": "heat" + }, + "ts": "2025-11-11T10:30:45.123456" +} +``` + +## Testing Rules + +Rules can be tested independently of the engine: + +```python +import pytest +from unittest.mock import AsyncMock, MagicMock +from packages.my_custom_rule import MyCustomRule +from packages.rule_interface import RuleDescriptor, RuleContext + +@pytest.mark.asyncio +async def test_my_rule(): + # Setup + rule = MyCustomRule() + + desc = RuleDescriptor( + id="test_rule", + type="my_custom@1.0", + targets={"my_devices": ["device_1"]}, + params={"temp": 22.0} + ) + + # Mock context + ctx = RuleContext( + logger=MagicMock(), + mqtt_publisher=AsyncMock(), + redis_state=AsyncMock(), + now_fn=lambda: datetime.now() + ) + + # Test event + evt = { + "device_id": "device_1", + "cap": "contact", + "payload": {"contact": "open"}, + "ts": "2025-11-11T10:30:45.123456" + } + + # Execute + await rule.on_event(evt, desc, ctx) + + # Assert + ctx.mqtt.publish_set_thermostat.assert_called_once_with('some_thermostat', 22.0) +``` + +## Extension Points + +The interface is designed to be extended without modifying the engine: + +1. **New rule types**: Just implement `Rule` and register in `RULE_IMPLEMENTATIONS` +2. **New MQTT commands**: Extend `MQTTPublisher` with new methods +3. **New state backends**: Implement `RedisState` interface with different storage +4. **Custom context**: Extend `RuleContext` with additional utilities + +The engine only depends on the abstract interfaces, not specific implementations. diff --git a/apps/rules/impl/__init__.py b/apps/rules/impl/__init__.py new file mode 100644 index 0000000..0c75ca8 --- /dev/null +++ b/apps/rules/impl/__init__.py @@ -0,0 +1,15 @@ +""" +Rule Implementations Package + +This package contains all rule implementation modules. + +Naming Convention: +- Module name: snake_case matching the rule type name + Example: window_setback.py for type 'window_setback@1.0' + +- Class name: PascalCase + 'Rule' suffix + Example: WindowSetbackRule + +The rule engine uses load_rule() from rule_interface to dynamically +import modules from this package based on the 'type' field in rules.yaml. +""" diff --git a/apps/rules/impl/window_setback.py b/apps/rules/impl/window_setback.py new file mode 100644 index 0000000..2c6b298 --- /dev/null +++ b/apps/rules/impl/window_setback.py @@ -0,0 +1,170 @@ +""" +Example Rule Implementation: Window Setback + +Demonstrates how to implement a Rule using the rule_interface. +This rule lowers thermostat temperature when a window is opened. +""" + +from typing import Any + +from apps.rules.rule_interface import Rule, RuleDescriptor, RuleContext + + +class WindowSetbackRule(Rule): + """ + Window setback automation rule. + + When a window/door contact opens, set thermostats to eco temperature. + When closed for a minimum duration, restore previous target temperature. + + Configuration: + targets: + contacts: List of contact sensor device IDs + thermostats: List of thermostat device IDs + params: + eco_target: Temperature to set when window opens (default: 16.0) + open_min_secs: Minimum seconds window must be open before triggering (default: 20) + close_min_secs: Minimum seconds window must be closed before restoring (default: 20) + previous_target_ttl_secs: How long to remember previous temperature (default: 86400) + + State storage: + Redis keys: + rule:{rule_id}:contact:{device_id}:state -> "open" | "closed" + rule:{rule_id}:contact:{device_id}:ts -> ISO timestamp of last change + rule:{rule_id}:thermo:{device_id}:previous -> Previous target temperature + """ + + async def on_event( + self, + evt: dict[str, Any], + desc: RuleDescriptor, + ctx: RuleContext + ) -> None: + """ + Process contact sensor or thermostat state changes. + + Logic: + 1. If contact opened → remember current thermostat targets, set to eco + 2. If contact closed for min_secs → restore previous targets + 3. If thermostat target changed → update stored previous value + """ + device_id = evt['device_id'] + cap = evt['cap'] + payload = evt['payload'] + + # Only process events for devices in our targets + target_contacts = desc.targets.contacts or [] + target_thermostats = desc.targets.thermostats or [] + + if cap == 'contact' and device_id in target_contacts: + await self._handle_contact_event(evt, desc, ctx) + + elif cap == 'thermostat' and device_id in target_thermostats: + await self._handle_thermostat_event(evt, desc, ctx) + + async def _handle_contact_event( + self, + evt: dict[str, Any], + desc: RuleDescriptor, + ctx: RuleContext + ) -> None: + """Handle contact sensor state change.""" + device_id = evt['device_id'] + contact_state = evt['payload'].get('contact') # "open" or "closed" + event_ts = evt.get('ts', ctx.now().isoformat()) + + if not contact_state: + ctx.logger.warning(f"Contact event missing 'contact' field: {evt}") + return + + # Store current state and timestamp + state_key = f"rule:{desc.id}:contact:{device_id}:state" + ts_key = f"rule:{desc.id}:contact:{device_id}:ts" + + await ctx.redis.set(state_key, contact_state) + await ctx.redis.set(ts_key, event_ts) + + if contact_state == 'open': + await self._on_window_opened(desc, ctx) + elif contact_state == 'closed': + await self._on_window_closed(desc, ctx) + + async def _on_window_opened(self, desc: RuleDescriptor, ctx: RuleContext) -> None: + """Window opened - set thermostats to eco temperature.""" + eco_target = desc.params.get('eco_target', 16.0) + target_thermostats = desc.targets.thermostats or [] + + ctx.logger.info( + f"Rule {desc.id}: Window opened, setting {len(target_thermostats)} " + f"thermostats to eco temperature {eco_target}°C" + ) + + # Set all thermostats to eco temperature + for thermo_id in target_thermostats: + try: + await ctx.mqtt.publish_set_thermostat(thermo_id, eco_target) + ctx.logger.debug(f"Set {thermo_id} to {eco_target}°C") + except Exception as e: + ctx.logger.error(f"Failed to set {thermo_id}: {e}") + + async def _on_window_closed(self, desc: RuleDescriptor, ctx: RuleContext) -> None: + """ + Window closed - restore previous temperatures if closed long enough. + + Note: This is simplified. A production implementation would check + close_min_secs and use a timer/scheduler. + """ + target_thermostats = desc.targets.thermostats or [] + ttl_secs = desc.params.get('previous_target_ttl_secs', 86400) + + ctx.logger.info( + f"Rule {desc.id}: Window closed, restoring {len(target_thermostats)} " + f"thermostats to previous temperatures" + ) + + # Restore previous temperatures + for thermo_id in target_thermostats: + prev_key = f"rule:{desc.id}:thermo:{thermo_id}:previous" + prev_temp_str = await ctx.redis.get(prev_key) + + if prev_temp_str: + try: + prev_temp = float(prev_temp_str) + await ctx.mqtt.publish_set_thermostat(thermo_id, prev_temp) + ctx.logger.debug(f"Restored {thermo_id} to {prev_temp}°C") + except Exception as e: + ctx.logger.error(f"Failed to restore {thermo_id}: {e}") + + async def _handle_thermostat_event( + self, + evt: dict[str, Any], + desc: RuleDescriptor, + ctx: RuleContext + ) -> None: + """ + Handle thermostat state change - remember current target. + + This allows us to restore the temperature when window closes. + """ + device_id = evt['device_id'] + payload = evt['payload'] + current_target = payload.get('target') + + if current_target is None: + return # No target in this state update + + # Store as previous target with TTL + prev_key = f"rule:{desc.id}:thermo:{device_id}:previous" + ttl_secs = desc.params.get('previous_target_ttl_secs', 86400) + + await ctx.redis.set(prev_key, str(current_target), ttl_secs=ttl_secs) + + ctx.logger.debug( + f"Rule {desc.id}: Stored previous target for {device_id}: {current_target}°C" + ) + + +# Rule registry - maps rule type to implementation class +RULE_IMPLEMENTATIONS = { + 'window_setback@1.0': WindowSetbackRule, +} diff --git a/apps/rules/main.py b/apps/rules/main.py index 73c4fac..f662ce7 100644 --- a/apps/rules/main.py +++ b/apps/rules/main.py @@ -1,12 +1,26 @@ -"""Rules main entry point.""" +""" +Rules Engine +Loads rules configuration, subscribes to MQTT events, and dispatches events +to registered rule implementations. +""" + +import asyncio import logging +import os import signal import sys -import time -from typing import NoReturn +from datetime import datetime +from typing import Any -from apscheduler.schedulers.background import BackgroundScheduler +from apps.rules.rules_config import load_rules_config +from apps.rules.rule_interface import ( + RuleDescriptor, + RuleContext, + MQTTClient, + RedisState, + load_rule +) # Configure logging logging.basicConfig( @@ -15,69 +29,287 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -# Global scheduler instance -scheduler: BackgroundScheduler | None = None - -def rule_tick() -> None: - """Example job that runs every minute. - - This is a placeholder for actual rule evaluation logic. +class RuleEngine: """ - logger.info("Rule tick") - - -def shutdown_handler(signum: int, frame: object) -> NoReturn: - """Handle shutdown signals gracefully. - - Args: - signum: Signal number - frame: Current stack frame + Rule engine that loads rules, subscribes to MQTT events, + and dispatches them to registered rule implementations. """ - logger.info(f"Received signal {signum}, shutting down...") - if scheduler: - scheduler.shutdown(wait=True) - logger.info("Scheduler stopped") - sys.exit(0) + + def __init__( + self, + rules_config_path: str, + mqtt_broker: str, + mqtt_port: int, + redis_url: str + ): + """ + Initialize rule engine. + + Args: + rules_config_path: Path to rules.yaml + mqtt_broker: MQTT broker hostname/IP + mqtt_port: MQTT broker port + redis_url: Redis connection URL + """ + self.rules_config_path = rules_config_path + self.mqtt_broker = mqtt_broker + self.mqtt_port = mqtt_port + self.redis_url = redis_url + + # Will be initialized in setup() + self.rule_descriptors: list[RuleDescriptor] = [] + self.rules: dict[str, Any] = {} # rule_id -> Rule instance + self.mqtt_client: MQTTClient | None = None + self.redis_state: RedisState | None = None + self.context: RuleContext | None = None + + # For graceful shutdown + self._shutdown_event = asyncio.Event() + + def setup(self) -> None: + """ + Load configuration and instantiate rules. + + Raises: + ImportError: If rule implementation not found + ValueError: If configuration is invalid + """ + logger.info(f"Loading rules configuration from {self.rules_config_path}") + + # Load rules configuration + config = load_rules_config(self.rules_config_path) + self.rule_descriptors = config.rules + + logger.info(f"Loaded {len(self.rule_descriptors)} rule(s) from configuration") + + # Instantiate each rule + for desc in self.rule_descriptors: + try: + rule_instance = load_rule(desc) + self.rules[desc.id] = rule_instance + logger.info(f" - {desc.id} (type: {desc.type})") + except Exception as e: + logger.error(f"Failed to load rule {desc.id} (type: {desc.type}): {e}") + raise + + logger.info(f"Successfully loaded {len(self.rules)} rule implementation(s)") + + # Initialize MQTT client + self.mqtt_client = MQTTClient( + broker=self.mqtt_broker, + port=self.mqtt_port, + client_id="rule_engine" + ) + self.mqtt_client.set_logger(logger) + + # Initialize Redis state + self.redis_state = RedisState(self.redis_url) + + # Create MQTT publisher wrapper for RuleContext + from apps.rules.rule_interface import MQTTPublisher + mqtt_publisher = MQTTPublisher(mqtt_client=self.mqtt_client) + + # Create rule context + self.context = RuleContext( + logger=logger, + mqtt_publisher=mqtt_publisher, + redis_state=self.redis_state, + now_fn=datetime.now + ) + + def _filter_rules_for_event(self, event: dict[str, Any]) -> list[tuple[str, RuleDescriptor]]: + """ + Filter rules that should receive this event. + + Rules match if: + - For contact events: device_id in targets.contacts + - For thermostat events: device_id in targets.thermostats + - (Room-based filtering could be added here) + + Args: + event: Normalized MQTT event + + Returns: + List of (rule_id, descriptor) tuples that should process this event + """ + matching_rules = [] + device_id = event.get('device_id') + cap = event.get('cap') + + if not device_id or not cap: + return matching_rules + + logger.debug(f"Filtering for cap={cap}, device_id={device_id}") + + for rule_id, desc in [(r.id, r) for r in self.rule_descriptors]: + targets = desc.targets + + # Check if this device is in the rule's targets + matched = False + + if cap == 'contact' and targets.contacts: + logger.debug(f"Rule {rule_id}: checking contacts {targets.contacts}") + if device_id in targets.contacts: + matched = True + + elif cap == 'thermostat' and targets.thermostats: + logger.debug(f"Rule {rule_id}: checking thermostats {targets.thermostats}") + if device_id in targets.thermostats: + matched = True + + # Could add room-based filtering here: + # elif 'rooms' in targets: + # device_room = get_device_room(device_id) + # if device_room in targets['rooms']: + # matched = True + + if matched: + matching_rules.append((rule_id, desc)) + + return matching_rules + + async def _dispatch_event(self, event: dict[str, Any]) -> None: + """ + Dispatch event to matching rules. + + Calls rule.on_event() for each matching rule sequentially + to preserve order and avoid race conditions. + + Args: + event: Normalized MQTT event + """ + # Debug logging + logger.debug(f"Received event: {event}") + + matching_rules = self._filter_rules_for_event(event) + + if not matching_rules: + # No rules interested in this event + logger.debug(f"No matching rules for {event.get('cap')}/{event.get('device_id')}") + return + + logger.info( + f"Event {event['cap']}/{event['device_id']}: " + f"{len(matching_rules)} matching rule(s)" + ) + + # Process rules sequentially to preserve order + for rule_id, desc in matching_rules: + rule = self.rules.get(rule_id) + if not rule: + logger.warning(f"Rule instance not found for {rule_id}") + continue + + try: + await rule.on_event(event, desc, self.context) + except Exception as e: + logger.error( + f"Error in rule {rule_id} processing event " + f"{event['cap']}/{event['device_id']}: {e}", + exc_info=True + ) + # Continue with other rules + + async def run(self) -> None: + """ + Main event loop - subscribe to MQTT and process events. + + Runs until shutdown signal received. + """ + logger.info("Starting event processing loop") + + try: + async for event in self.mqtt_client.connect(): + # Check for shutdown + if self._shutdown_event.is_set(): + logger.info("Shutdown signal received, stopping event loop") + break + + # Dispatch event to matching rules + await self._dispatch_event(event) + + except asyncio.CancelledError: + logger.info("Event loop cancelled") + raise + except Exception as e: + logger.error(f"Fatal error in event loop: {e}", exc_info=True) + raise + + async def shutdown(self) -> None: + """Graceful shutdown - close connections.""" + logger.info("Shutting down rule engine...") + self._shutdown_event.set() + + if self.redis_state: + await self.redis_state.close() + logger.info("Redis connection closed") + + logger.info("Shutdown complete") + + +async def main_async() -> None: + """Async main function.""" + # Read configuration from environment + rules_config = os.getenv('RULES_CONFIG', 'config/rules.yaml') + mqtt_broker = os.getenv('MQTT_BROKER', '172.16.2.16') + mqtt_port = int(os.getenv('MQTT_PORT', '1883')) + redis_host = os.getenv('REDIS_HOST', '172.23.1.116') + redis_port = int(os.getenv('REDIS_PORT', '6379')) + redis_db = int(os.getenv('REDIS_DB', '8')) + redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}' + + logger.info("=" * 60) + logger.info("Rules Engine Starting") + logger.info("=" * 60) + logger.info(f"Config: {rules_config}") + logger.info(f"MQTT: {mqtt_broker}:{mqtt_port}") + logger.info(f"Redis: {redis_url}") + logger.info("=" * 60) + + # Initialize engine + engine = RuleEngine( + rules_config_path=rules_config, + mqtt_broker=mqtt_broker, + mqtt_port=mqtt_port, + redis_url=redis_url + ) + + # Load rules + try: + engine.setup() + except Exception as e: + logger.error(f"Failed to setup engine: {e}", exc_info=True) + sys.exit(1) + + # Setup signal handlers for graceful shutdown + loop = asyncio.get_running_loop() + + def signal_handler(): + logger.info("Received shutdown signal") + asyncio.create_task(engine.shutdown()) + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, signal_handler) + + # Run engine + try: + await engine.run() + except asyncio.CancelledError: + logger.info("Main task cancelled") + finally: + await engine.shutdown() def main() -> None: - """Run the rules application.""" - global scheduler - - logger.info("Rules engine starting...") - - # Register signal handlers - signal.signal(signal.SIGINT, shutdown_handler) - signal.signal(signal.SIGTERM, shutdown_handler) - - # Initialize scheduler - scheduler = BackgroundScheduler() - - # Add example job - runs every minute - scheduler.add_job( - rule_tick, - 'interval', - minutes=1, - id='rule_tick', - name='Rule Tick Job' - ) - - # Start scheduler - scheduler.start() - logger.info("Scheduler started with rule_tick job (every 1 minute)") - - # Run initial tick immediately - rule_tick() - - # Keep the application running + """Entry point for rule engine.""" try: - while True: - time.sleep(1) + asyncio.run(main_async()) except KeyboardInterrupt: - logger.info("KeyboardInterrupt received, shutting down...") - scheduler.shutdown(wait=True) - logger.info("Scheduler stopped") + logger.info("Keyboard interrupt received") + except Exception as e: + logger.error(f"Fatal error: {e}", exc_info=True) + sys.exit(1) if __name__ == "__main__": diff --git a/apps/rules/requirements.txt b/apps/rules/requirements.txt new file mode 100644 index 0000000..18de568 --- /dev/null +++ b/apps/rules/requirements.txt @@ -0,0 +1,5 @@ +# Rules Engine Dependencies +pydantic>=2.0 +redis>=5.0.1 +aiomqtt>=2.0.1 +pyyaml>=6.0.1 diff --git a/apps/rules/rule_interface.py b/apps/rules/rule_interface.py new file mode 100644 index 0000000..9b7b70a --- /dev/null +++ b/apps/rules/rule_interface.py @@ -0,0 +1,704 @@ +""" +Rule Interface and Context Objects + +Provides the core abstractions for implementing automation rules: +- RuleDescriptor: Configuration data for a rule instance +- RedisState: State persistence interface +- RuleContext: Runtime context provided to rules +- Rule: Abstract base class for all rule implementations +""" + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Any, Awaitable, Optional + +from pydantic import BaseModel, Field + + +class RuleDescriptor(BaseModel): + """ + Configuration descriptor for a rule instance. + + This is the validated representation of a rule from rules.yaml. + The engine loads these and passes them to rule implementations. + """ + + id: str = Field(..., description="Unique identifier for this rule instance") + name: Optional[str] = Field(None, description="Optional human-readable name") + type: str = Field(..., description="Rule type with version (e.g., 'window_setback@1.0')") + targets: dict[str, Any] = Field( + default_factory=dict, + description="Rule-specific target specification (rooms, devices, etc.)" + ) + params: dict[str, Any] = Field( + default_factory=dict, + description="Rule-specific parameters" + ) + + +class RedisState: + """ + Async Redis-backed state persistence for rules with automatic reconnection. + + Provides a simple key-value and hash storage interface for rules to persist + state across restarts. All operations are asynchronous and include retry logic + for robustness against temporary Redis outages. + + Key Convention: + - Callers should use keys like: f"rules:{rule_id}:contact:{device_id}" + - This class does NOT enforce key prefixes - caller controls the full key + """ + + def __init__(self, url: str, max_retries: int = 3, retry_delay: float = 0.5): + """ + Initialize RedisState with connection URL. + + Args: + url: Redis connection URL (e.g., 'redis://172.23.1.116:6379/8') + max_retries: Maximum number of retry attempts for operations (default: 3) + retry_delay: Initial delay between retries in seconds, uses exponential backoff (default: 0.5) + + Note: + Connection is lazy - actual connection happens on first operation. + Uses connection pooling with automatic reconnection on failure. + """ + self._url = url + self._max_retries = max_retries + self._retry_delay = retry_delay + self._redis: Optional[Any] = None # redis.asyncio.Redis instance + + async def _get_client(self): + """ + Get or create Redis client with connection pool. + + Lazy initialization ensures we don't connect until first use. + Uses decode_responses=True for automatic UTF-8 decoding. + """ + if self._redis is None: + import redis.asyncio as aioredis + self._redis = await aioredis.from_url( + self._url, + decode_responses=True, # Automatic UTF-8 decode + encoding='utf-8', + max_connections=10, # Connection pool size + socket_connect_timeout=5, + socket_keepalive=True, + health_check_interval=30 # Auto-check connection health + ) + return self._redis + + async def _execute_with_retry(self, operation, *args, **kwargs): + """ + Execute Redis operation with exponential backoff retry. + + Handles temporary connection failures gracefully by retrying + with exponential backoff. On permanent failure, raises the + original exception. + + Args: + operation: Async callable (Redis method) + *args, **kwargs: Arguments to pass to operation + + Returns: + Result of the operation + + Raises: + Exception: If all retries are exhausted + """ + import asyncio + + last_exception = None + for attempt in range(self._max_retries): + try: + client = await self._get_client() + return await operation(client, *args, **kwargs) + except Exception as e: + last_exception = e + if attempt < self._max_retries - 1: + # Exponential backoff: 0.5s, 1s, 2s, ... + delay = self._retry_delay * (2 ** attempt) + await asyncio.sleep(delay) + # Reset client to force reconnection + if self._redis: + try: + await self._redis.close() + except: + pass + self._redis = None + + # All retries exhausted + raise last_exception + + # JSON helpers for complex data structures + def _dumps(self, obj: Any) -> str: + """Serialize Python object to JSON string.""" + import json + return json.dumps(obj, ensure_ascii=False) + + def _loads(self, s: str) -> Any: + """Deserialize JSON string to Python object.""" + import json + return json.loads(s) + + async def get(self, key: str) -> Optional[str]: + """ + Get a string value by key. + + Args: + key: Redis key (e.g., "rules:my_rule:contact:sensor_1") + + Returns: + String value or None if key doesn't exist + + Example: + >>> state = RedisState("redis://localhost:6379/0") + >>> await state.set("rules:r1:temp", "22.5") + >>> temp = await state.get("rules:r1:temp") + >>> print(temp) # "22.5" + """ + async def _get(client, k): + return await client.get(k) + + return await self._execute_with_retry(_get, key) + + async def set(self, key: str, value: str, ttl_secs: Optional[int] = None) -> None: + """ + Set a string value with optional TTL. + + Args: + key: Redis key + value: String value to store + ttl_secs: Optional time-to-live in seconds. If None, key persists indefinitely. + + Example: + >>> state = RedisState("redis://localhost:6379/0") + >>> # Store with 1 hour TTL + >>> await state.set("rules:r1:previous_temp", "20.0", ttl_secs=3600) + """ + async def _set(client, k, v, ttl): + if ttl is not None: + await client.setex(k, ttl, v) + else: + await client.set(k, v) + + await self._execute_with_retry(_set, key, value, ttl_secs) + + async def hget(self, key: str, field: str) -> Optional[str]: + """ + Get a hash field value. + + Args: + key: Redis hash key + field: Field name within the hash + + Returns: + String value or None if field doesn't exist + + Example: + >>> state = RedisState("redis://localhost:6379/0") + >>> await state.hset("rules:r1:device_states", "sensor_1", "open") + >>> value = await state.hget("rules:r1:device_states", "sensor_1") + >>> print(value) # "open" + """ + async def _hget(client, k, f): + return await client.hget(k, f) + + return await self._execute_with_retry(_hget, key, field) + + async def hset(self, key: str, field: str, value: str) -> None: + """ + Set a hash field value. + + Args: + key: Redis hash key + field: Field name within the hash + value: String value to store + + Example: + >>> state = RedisState("redis://localhost:6379/0") + >>> await state.hset("rules:r1:sensors", "bedroom", "open") + >>> await state.hset("rules:r1:sensors", "kitchen", "closed") + """ + async def _hset(client, k, f, v): + await client.hset(k, f, v) + + await self._execute_with_retry(_hset, key, field, value) + + async def expire(self, key: str, ttl_secs: int) -> None: + """ + Set or update TTL on an existing key. + + Args: + key: Redis key + ttl_secs: Time-to-live in seconds + + Example: + >>> state = RedisState("redis://localhost:6379/0") + >>> await state.set("rules:r1:temp", "22.5") + >>> await state.expire("rules:r1:temp", 3600) # Expire in 1 hour + """ + async def _expire(client, k, ttl): + await client.expire(k, ttl) + + await self._execute_with_retry(_expire, key, ttl_secs) + + async def close(self) -> None: + """ + Close Redis connection and cleanup resources. + + Should be called when shutting down the application. + """ + if self._redis: + await self._redis.close() + self._redis = None + + +class MQTTClient: + """ + Async MQTT client for rule engine with event normalization and publishing. + + Subscribes to device state topics, normalizes events to a consistent format, + and provides high-level publishing methods for device commands. + + Event Normalization: + All incoming MQTT messages are parsed into a normalized event structure: + { + "topic": "home/contact/sensor_1/state", + "type": "state", + "cap": "contact", # Capability type (contact, thermostat, light, etc.) + "device_id": "sensor_1", + "payload": {"contact": "open"}, + "ts": "2025-11-11T10:30:45.123456" + } + """ + + def __init__( + self, + broker: str, + port: int = 1883, + client_id: str = "rule_engine", + reconnect_interval: int = 5, + max_reconnect_delay: int = 300 + ): + """ + Initialize MQTT client. + + Args: + broker: MQTT broker hostname or IP + port: MQTT broker port (default: 1883) + client_id: Unique client ID for this connection + reconnect_interval: Initial reconnect delay in seconds (default: 5) + max_reconnect_delay: Maximum reconnect delay in seconds (default: 300) + """ + self._broker = broker + self._port = port + self._client_id = client_id + self._reconnect_interval = reconnect_interval + self._max_reconnect_delay = max_reconnect_delay + self._client = None + self._logger = None # Set externally + + def set_logger(self, logger): + """Set logger instance for connection status messages.""" + self._logger = logger + + def _log(self, level: str, msg: str): + """Internal logging helper.""" + if self._logger: + getattr(self._logger, level)(msg) + else: + print(f"[{level.upper()}] {msg}") + + async def connect(self): + """ + Connect to MQTT broker with automatic reconnection. + + This method manages the connection and automatically reconnects + with exponential backoff if the connection is lost. + """ + import aiomqtt + from aiomqtt import Client + + reconnect_delay = self._reconnect_interval + + while True: + try: + self._log("info", f"Connecting to MQTT broker {self._broker}:{self._port} (client_id={self._client_id})") + + async with Client( + hostname=self._broker, + port=self._port, + identifier=self._client_id, + ) as client: + self._client = client + self._log("info", f"Connected to MQTT broker {self._broker}:{self._port}") + + # Subscribe to device state topics + await client.subscribe("home/contact/+/state") + await client.subscribe("home/thermostat/+/state") + self._log("info", "Subscribed to home/contact/+/state, home/thermostat/+/state") + + # Reset reconnect delay on successful connection + reconnect_delay = self._reconnect_interval + + # Process messages - this is a generator that yields messages + async for message in client.messages: + yield self._normalize_event(message) + + except aiomqtt.MqttError as e: + self._log("error", f"MQTT connection error: {e}") + self._log("info", f"Reconnecting in {reconnect_delay} seconds...") + + import asyncio + await asyncio.sleep(reconnect_delay) + + # Exponential backoff + reconnect_delay = min(reconnect_delay * 2, self._max_reconnect_delay) + + def _normalize_event(self, message) -> dict[str, Any]: + """ + Normalize MQTT message to standard event format. + + Parses topic to extract capability type and device_id, + adds timestamp, and structures payload. + + Args: + message: aiomqtt.Message instance + + Returns: + Normalized event dictionary + + Example: + Topic: home/contact/sensor_bedroom/state + Payload: {"contact": "open"} + + Returns: + { + "topic": "home/contact/sensor_bedroom/state", + "type": "state", + "cap": "contact", + "device_id": "sensor_bedroom", + "payload": {"contact": "open"}, + "ts": "2025-11-11T10:30:45.123456" + } + """ + from datetime import datetime + import json + + topic = str(message.topic) + topic_parts = topic.split('/') + + # Parse topic: home/{capability}/{device_id}/state + if len(topic_parts) >= 4 and topic_parts[0] == 'home' and topic_parts[3] == 'state': + cap = topic_parts[1] # contact, thermostat, light, etc. + device_id = topic_parts[2] + else: + # Fallback for unexpected topic format + cap = "unknown" + device_id = topic_parts[-2] if len(topic_parts) >= 2 else "unknown" + + # Parse payload + try: + payload = json.loads(message.payload.decode('utf-8')) + except (json.JSONDecodeError, UnicodeDecodeError): + payload = {"raw": message.payload.decode('utf-8', errors='replace')} + + # Generate timestamp + ts = datetime.now().isoformat() + + return { + "topic": topic, + "type": "state", + "cap": cap, + "device_id": device_id, + "payload": payload, + "ts": ts + } + + async def publish_set_thermostat(self, device_id: str, target: float) -> None: + """ + Publish thermostat target temperature command. + + Publishes to: home/thermostat/{device_id}/set + QoS: 1 (at least once delivery) + + Args: + device_id: Thermostat device identifier + target: Target temperature in degrees Celsius + + Example: + >>> mqtt = MQTTClient("172.16.2.16", 1883) + >>> await mqtt.publish_set_thermostat("thermostat_wohnzimmer", 22.5) + + Published to: home/thermostat/thermostat_wohnzimmer/set + Payload: {"type":"thermostat","payload":{"target":22.5}} + """ + import json + + if self._client is None: + raise RuntimeError("MQTT client not connected. Call connect() first.") + + topic = f"home/thermostat/{device_id}/set" + payload = { + "type": "thermostat", + "payload": { + "target": target + } + } + + payload_str = json.dumps(payload) + + await self._client.publish( + topic, + payload=payload_str.encode('utf-8'), + qos=1 # At least once delivery + ) + + self._log("debug", f"Published SET to {topic}: {payload_str}") + + +# Legacy alias for backward compatibility +class MQTTPublisher: + """ + Legacy MQTT publishing interface - DEPRECATED. + + Use MQTTClient instead for new code. + This class is kept for backward compatibility with existing documentation. + """ + + def __init__(self, mqtt_client): + """ + Initialize MQTT publisher. + + Args: + mqtt_client: MQTTClient instance + """ + self._mqtt = mqtt_client + + async def publish_set_thermostat(self, device_id: str, target: float) -> None: + """ + Publish a thermostat target temperature command. + + Args: + device_id: Thermostat device identifier + target: Target temperature in degrees Celsius + """ + await self._mqtt.publish_set_thermostat(device_id, target) + + +class RuleContext: + """ + Runtime context provided to rules during event processing. + + Contains all external dependencies and utilities a rule needs: + - Logger for diagnostics + - MQTT client for publishing commands + - Redis client for state persistence + - Current timestamp function + """ + + def __init__( + self, + logger, + mqtt_publisher: MQTTPublisher, + redis_state: RedisState, + now_fn=None + ): + """ + Initialize rule context. + + Args: + logger: Logger instance (e.g., logging.Logger) + mqtt_publisher: MQTTPublisher instance for device commands + redis_state: RedisState instance for persistence + now_fn: Optional callable returning current datetime (defaults to datetime.now) + """ + self.logger = logger + self.mqtt = mqtt_publisher + self.redis = redis_state + self._now_fn = now_fn or datetime.now + + def now(self) -> datetime: + """ + Get current timestamp. + + Returns: + Current datetime (timezone-aware if now_fn provides it) + """ + return self._now_fn() + + +class Rule(ABC): + """ + Abstract base class for all automation rule implementations. + + Rules implement event-driven automation logic. The engine calls on_event() + for each relevant device state change, passing the event data, rule configuration, + and runtime context. + + Implementations must be idempotent - processing the same event multiple times + should produce the same result. + + Example implementation: + + class WindowSetbackRule(Rule): + async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None: + device_id = evt['device_id'] + cap = evt['cap'] + + if cap == 'contact': + contact_state = evt['payload'].get('contact') + if contact_state == 'open': + # Window opened - set thermostats to eco + for thermo_id in desc.targets.get('thermostats', []): + eco_temp = desc.params.get('eco_target', 16.0) + await ctx.mqtt.publish_set_thermostat(thermo_id, eco_temp) + """ + + @abstractmethod + async def on_event( + self, + evt: dict[str, Any], + desc: RuleDescriptor, + ctx: RuleContext + ) -> None: + """ + Process a device state change event. + + This method is called by the rule engine whenever a device state changes + that is relevant to this rule. The implementation should examine the event + and take appropriate actions (e.g., publish MQTT commands, update state). + + MUST be idempotent: Processing the same event multiple times should be safe. + + Args: + evt: Event dictionary with the following structure: + { + "topic": "home/contact/device_id/state", # MQTT topic + "type": "state", # Message type + "cap": "contact", # Capability type + "device_id": "kontakt_wohnzimmer", # Device identifier + "payload": {"contact": "open"}, # Capability-specific payload + "ts": "2025-11-11T10:30:45.123456" # ISO timestamp + } + + desc: Rule configuration from rules.yaml + + ctx: Runtime context with logger, MQTT, Redis, and timestamp utilities + + Returns: + None + + Raises: + Exception: Implementation may raise exceptions for errors. + The engine will log them but continue processing. + """ + pass + + +# ============================================================================ +# Dynamic Rule Loading +# ============================================================================ + +import importlib +import re +from typing import Type + +# Cache for loaded rule classes (per process) +_RULE_CLASS_CACHE: dict[str, Type[Rule]] = {} + + +def load_rule(desc: RuleDescriptor) -> Rule: + """ + Dynamically load and instantiate a rule based on its type descriptor. + + Convention: + - Rule type format: 'name@version' (e.g., 'window_setback@1.0') + - Module path: apps.rules.impl.{name} + - Class name: PascalCase version of name + 'Rule' + Example: 'window_setback' → 'WindowSetbackRule' + + Args: + desc: Rule descriptor from rules.yaml + + Returns: + Instantiated Rule object + + Raises: + ValueError: If type format is invalid + ImportError: If rule module cannot be found + AttributeError: If rule class cannot be found in module + + Examples: + >>> desc = RuleDescriptor( + ... id="test_rule", + ... type="window_setback@1.0", + ... targets={}, + ... params={} + ... ) + >>> rule = load_rule(desc) + >>> isinstance(rule, Rule) + True + """ + rule_type = desc.type + + # Check cache first + if rule_type in _RULE_CLASS_CACHE: + rule_class = _RULE_CLASS_CACHE[rule_type] + return rule_class() + + # Parse type: 'name@version' + if '@' not in rule_type: + raise ValueError( + f"Invalid rule type '{rule_type}': must be in format 'name@version' " + f"(e.g., 'window_setback@1.0')" + ) + + name, version = rule_type.split('@', 1) + + # Validate name (alphanumeric and underscores only) + if not re.match(r'^[a-z][a-z0-9_]*$', name): + raise ValueError( + f"Invalid rule name '{name}': must start with lowercase letter " + f"and contain only lowercase letters, numbers, and underscores" + ) + + # Convert snake_case to PascalCase for class name + # Example: 'window_setback' → 'WindowSetbackRule' + class_name = ''.join(word.capitalize() for word in name.split('_')) + 'Rule' + + # Construct module path + module_path = f'apps.rules.impl.{name}' + + # Try to import the module + try: + module = importlib.import_module(module_path) + except ImportError as e: + raise ImportError( + f"Cannot load rule type '{rule_type}': module '{module_path}' not found.\n" + f"Hint: Create file 'apps/rules/impl/{name}.py' with class '{class_name}'.\n" + f"Original error: {e}" + ) from e + + # Try to get the class from the module + try: + rule_class = getattr(module, class_name) + except AttributeError as e: + raise AttributeError( + f"Cannot load rule type '{rule_type}': class '{class_name}' not found in module '{module_path}'.\n" + f"Hint: Define 'class {class_name}(Rule):' in 'apps/rules/impl/{name}.py'.\n" + f"Available classes in module: {[name for name in dir(module) if not name.startswith('_')]}" + ) from e + + # Validate that it's a Rule subclass + if not issubclass(rule_class, Rule): + raise TypeError( + f"Class '{class_name}' in '{module_path}' is not a subclass of Rule. " + f"Ensure it inherits from apps.rules.rule_interface.Rule" + ) + + # Cache the class + _RULE_CLASS_CACHE[rule_type] = rule_class + + # Instantiate and return + return rule_class() diff --git a/apps/rules/rules_config.py b/apps/rules/rules_config.py new file mode 100644 index 0000000..dcad590 --- /dev/null +++ b/apps/rules/rules_config.py @@ -0,0 +1,128 @@ +""" +Rules Configuration Schema and Loader + +Provides Pydantic models for validating rules.yaml configuration. +""" + +from pathlib import Path +from typing import Any, Optional + +import yaml +from pydantic import BaseModel, Field, field_validator + + +class RuleTargets(BaseModel): + """Targets for a rule (rooms, devices, etc.)""" + rooms: list[str] = Field(default_factory=list) + contacts: list[str] = Field(default_factory=list) + thermostats: list[str] = Field(default_factory=list) + lights: list[str] = Field(default_factory=list) + relays: list[str] = Field(default_factory=list) + + +class Rule(BaseModel): + """Single rule configuration""" + id: str = Field(..., description="Unique rule identifier") + name: Optional[str] = Field(None, description="Optional human-readable name") + type: str = Field(..., description="Rule type (e.g., 'window_setback@1.0')") + targets: RuleTargets = Field(..., description="Target rooms and devices") + params: dict[str, Any] = Field(default_factory=dict, description="Rule-specific parameters") + + @field_validator('id') + @classmethod + def validate_id(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Rule ID cannot be empty") + return v.strip() + + @field_validator('type') + @classmethod + def validate_type(cls, v: str) -> str: + if not v or not v.strip(): + raise ValueError("Rule type cannot be empty") + if '@' not in v: + raise ValueError(f"Rule type must include version (e.g., 'window_setback@1.0'), got: {v}") + return v.strip() + + +class RulesConfig(BaseModel): + """Root configuration object""" + rules: list[Rule] = Field(..., description="List of all rules") + + @field_validator('rules') + @classmethod + def validate_unique_ids(cls, rules: list[Rule]) -> list[Rule]: + """Ensure all rule IDs are unique""" + ids = [rule.id for rule in rules] + duplicates = [id for id in ids if ids.count(id) > 1] + if duplicates: + raise ValueError(f"Duplicate rule IDs found: {set(duplicates)}") + return rules + + +def load_rules_config(config_path: str | Path = "config/rules.yaml") -> RulesConfig: + """ + Load and validate rules configuration from YAML file. + + Args: + config_path: Path to rules.yaml file + + Returns: + Validated RulesConfig object + + Raises: + FileNotFoundError: If config file doesn't exist + ValueError: If YAML is invalid or validation fails + """ + config_path = Path(config_path) + + if not config_path.exists(): + raise FileNotFoundError(f"Rules configuration not found: {config_path}") + + with open(config_path, 'r', encoding='utf-8') as f: + try: + data = yaml.safe_load(f) + except yaml.YAMLError as e: + raise ValueError(f"Invalid YAML in {config_path}: {e}") from e + + if not data: + raise ValueError(f"Empty configuration file: {config_path}") + + if 'rules' not in data: + raise ValueError( + f"Missing 'rules:' key in {config_path}. " + "Configuration must start with 'rules:' followed by a list of rule definitions." + ) + + try: + return RulesConfig(**data) + except Exception as e: + raise ValueError(f"Configuration validation failed: {e}") from e + + +def get_rule_by_id(config: RulesConfig, rule_id: str) -> Rule | None: + """Get a specific rule by ID""" + for rule in config.rules: + if rule.id == rule_id: + return rule + return None + + +def get_rules_by_type(config: RulesConfig, rule_type: str) -> list[Rule]: + """Get all rules of a specific type""" + return [rule for rule in config.rules if rule.type == rule_type] + + +if __name__ == "__main__": + # Test configuration loading + try: + config = load_rules_config() + print(f"✅ Loaded {len(config.rules)} rules:") + for rule in config.rules: + name = f" ({rule.name})" if rule.name else "" + print(f" - {rule.id}{name}: {rule.type}") + print(f" Targets: {len(rule.targets.rooms)} rooms, " + f"{len(rule.targets.contacts)} contacts, " + f"{len(rule.targets.thermostats)} thermostats") + except Exception as e: + print(f"❌ Configuration error: {e}") diff --git a/apps/simulator/README.md b/apps/simulator/README.md index a6b05af..a1ee92c 100644 --- a/apps/simulator/README.md +++ b/apps/simulator/README.md @@ -59,13 +59,9 @@ docker run --rm -p 8010:8010 \ simulator:dev ``` -**Mit Docker Network (optional):** -```bash -docker run --rm -p 8010:8010 \ - --name simulator \ - -e MQTT_BROKER=172.23.1.102 \ - simulator:dev -``` +**Note for finch/nerdctl users:** +- finch binds ports to `127.0.0.1` by default +- The web interface will be accessible at `http://127.0.0.1:8010` #### Environment Variables diff --git a/apps/ui/README.md b/apps/ui/README.md index 353352f..4be05f2 100644 --- a/apps/ui/README.md +++ b/apps/ui/README.md @@ -37,17 +37,6 @@ docker build -t ui:dev -f apps/ui/Dockerfile . #### Run Container -**Linux Server (empfohlen):** -```bash -# Mit Docker Network für Container-to-Container Kommunikation -docker run --rm -p 8002:8002 \ - -e UI_PORT=8002 \ - -e API_BASE=http://172.19.1.11:8001 \ - -e BASE_PATH=/ \ - ui:dev -``` - -**macOS mit finch/nerdctl:** ```bash docker run --rm -p 8002:8002 \ --add-host=host.docker.internal:host-gateway \ @@ -57,11 +46,10 @@ docker run --rm -p 8002:8002 \ ui:dev ``` -**Hinweise:** -- **Linux**: Verwende Docker Network und Service-Namen (`http://api:8001`) -- **macOS/finch**: Verwende `host.docker.internal` mit `--add-host` flag -- Die UI macht Server-Side API-Aufrufe beim Rendern der Seite -- Browser-seitige Realtime-Updates (SSE) gehen direkt vom Browser zur API +**Note for finch/nerdctl users:** +- finch binds ports to `127.0.0.1` by default (not `0.0.0.0`) +- Use `--add-host=host.docker.internal:host-gateway` to allow container-to-host communication +- Set `API_BASE=http://host.docker.internal:8001` to reach the API container #### Environment Variables diff --git a/config/rules.yaml b/config/rules.yaml new file mode 100644 index 0000000..a514ce3 --- /dev/null +++ b/config/rules.yaml @@ -0,0 +1,101 @@ +# Rules Configuration +# Auto-generated from devices.yaml + +rules: +- id: window_setback_esszimmer + name: Fensterabsenkung Esszimmer + type: window_setback@1.0 + targets: + rooms: + - Esszimmer + contacts: + - kontakt_esszimmer_strasse_links + - kontakt_esszimmer_strasse_rechts + thermostats: + - thermostat_esszimmer + params: + eco_target: 16.0 + open_min_secs: 20 + close_min_secs: 20 + previous_target_ttl_secs: 86400 +- id: window_setback_kueche + name: Fensterabsenkung Küche + type: window_setback@1.0 + targets: + rooms: + - Küche + contacts: + - kontakt_kueche_garten_fenster + - kontakt_kueche_garten_tuer + - kontakt_kueche_strasse_links + - kontakt_kueche_strasse_rechts + thermostats: + - thermostat_kueche + params: + eco_target: 16.0 + open_min_secs: 20 + close_min_secs: 20 + previous_target_ttl_secs: 86400 +- id: window_setback_patty + name: Fensterabsenkung Arbeitszimmer Patty + type: window_setback@1.0 + targets: + rooms: + - Arbeitszimmer Patty + contacts: + - kontakt_patty_garten_links + - kontakt_patty_garten_rechts + - kontakt_patty_strasse + thermostats: + - thermostat_patty + params: + eco_target: 16.0 + open_min_secs: 20 + close_min_secs: 20 + previous_target_ttl_secs: 86400 +- id: window_setback_schlafzimmer + name: Fensterabsenkung Schlafzimmer + type: window_setback@1.0 + targets: + rooms: + - Schlafzimmer + contacts: + - kontakt_schlafzimmer_strasse + thermostats: + - thermostat_schlafzimmer + params: + eco_target: 16.0 + open_min_secs: 20 + close_min_secs: 20 + previous_target_ttl_secs: 86400 +- id: window_setback_wohnzimmer + name: Fensterabsenkung Wohnzimmer + type: window_setback@1.0 + targets: + rooms: + - Wohnzimmer + contacts: + - kontakt_wohnzimmer_garten_links + - kontakt_wohnzimmer_garten_rechts + thermostats: + - thermostat_wohnzimmer + params: + eco_target: 16.0 + open_min_secs: 20 + close_min_secs: 20 + previous_target_ttl_secs: 86400 +- id: window_setback_wolfgang + name: Fensterabsenkung Arbeitszimmer Wolfgang + type: window_setback@1.0 + targets: + rooms: + - Arbeitszimmer Wolfgang + contacts: + - kontakt_wolfgang_garten + thermostats: + - thermostat_wolfgang + params: + eco_target: 16.0 + open_min_secs: 20 + close_min_secs: 20 + previous_target_ttl_secs: 86400 diff --git a/docker-compose.yaml b/docker-compose.yaml index e31b21e..1ea874c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -47,3 +47,16 @@ services: <<: *default-env volumes: - ./config:/app/config:ro + + rules: + build: + context: . + dockerfile: apps/rules/Dockerfile + container_name: rules + environment: + <<: *default-env + RULES_CONFIG: "/app/config/rules.yaml" + volumes: + - ./config:/app/config:ro + depends_on: + - abstraction diff --git a/infra/docker-compose.yml b/infra/docker-compose.yml deleted file mode 100644 index cb99156..0000000 --- a/infra/docker-compose.yml +++ /dev/null @@ -1,15 +0,0 @@ -version: '3.8' - -services: - # Placeholder for future services - # Example: - # api: - # build: - # context: .. - # dockerfile: apps/api/Dockerfile - # ports: - # - "8000:8000" - - placeholder: - image: alpine:latest - command: echo "Docker Compose placeholder - add your services here"