# Rule Interface Documentation ## Overview The rule interface provides a clean abstraction for implementing automation rules. Rules respond to device state changes and can publish commands, persist state, and log diagnostics. ## Core Components ### 1. RuleDescriptor Configuration data for a rule instance (loaded from `rules.yaml`): ```python RuleDescriptor( id="window_setback_wohnzimmer", # Unique rule ID name="Fensterabsenkung Wohnzimmer", # Optional display name type="window_setback@1.0", # Rule type + version targets={ # Rule-specific targets "rooms": ["Wohnzimmer"], "contacts": ["kontakt_wohnzimmer_..."], "thermostats": ["thermostat_wohnzimmer"] }, params={ # Rule-specific parameters "eco_target": 16.0, "open_min_secs": 20 } ) ``` ### 2. RedisState Async state persistence with automatic reconnection and retry logic: ```python # Initialize (done by rule engine) redis_state = RedisState("redis://172.23.1.116:6379/8") # Simple key-value with TTL await ctx.redis.set("rules:my_rule:temp", "22.5", ttl_secs=3600) value = await ctx.redis.get("rules:my_rule:temp") # Returns "22.5" or None # Hash storage (for multiple related values) await ctx.redis.hset("rules:my_rule:sensors", "bedroom", "open") await ctx.redis.hset("rules:my_rule:sensors", "kitchen", "closed") value = await ctx.redis.hget("rules:my_rule:sensors", "bedroom") # "open" # TTL management await ctx.redis.expire("rules:my_rule:temp", 7200) # Extend to 2 hours # JSON helpers (for complex data) import json data = {"temp": 22.5, "humidity": 45} await ctx.redis.set("rules:my_rule:data", ctx.redis._dumps(data)) stored = await ctx.redis.get("rules:my_rule:data") parsed = ctx.redis._loads(stored) if stored else None ``` **Key Conventions:** - Use prefix `rules:{rule_id}:` for all keys - Example: `rules:window_setback_wohnzimmer:thermo:device_123:previous` - TTL recommended for temporary state (previous temperatures, timers) **Robustness Features:** - Automatic retry with exponential backoff (default: 3 retries) - Connection pooling (max 10 connections) - Automatic reconnection on Redis restart - Health checks every 30 seconds - All operations wait and retry, no exceptions on temporary outages ### 3. MQTTClient Async MQTT client with event normalization and command publishing: ```python # Initialize (done by rule engine) mqtt_client = MQTTClient( broker="172.16.2.16", port=1883, client_id="rule_engine" ) # Subscribe and receive normalized events async for event in mqtt_client.connect(): # Event structure: # { # "topic": "home/contact/sensor_1/state", # "type": "state", # "cap": "contact", # Capability (contact, thermostat, etc.) # "device_id": "sensor_1", # "payload": {"contact": "open"}, # "ts": "2025-11-11T10:30:45.123456" # } if event['cap'] == 'contact': handle_contact(event) elif event['cap'] == 'thermostat': handle_thermostat(event) # Publish commands (within async context) await mqtt_client.publish_set_thermostat("thermostat_id", 22.5) ``` **Subscriptions:** - `home/contact/+/state` - All contact sensor state changes - `home/thermostat/+/state` - All thermostat state changes **Publishing:** - Topic: `home/thermostat/{device_id}/set` - Payload: `{"type":"thermostat","payload":{"target":22.5}}` - QoS: 1 (at least once delivery) **Robustness:** - Automatic reconnection with exponential backoff - Connection logging (connect/disconnect events) - Clean session handling ### 4. MQTTPublisher (Legacy) Simplified wrapper around MQTTClient for backward compatibility: ```python # Set thermostat temperature await ctx.mqtt.publish_set_thermostat("thermostat_wohnzimmer", 21.5) ``` ### 5. RuleContext Runtime context provided to rules: ```python class RuleContext: logger # Logger instance mqtt # MQTTPublisher redis # RedisState now() -> datetime # Current timestamp ``` ### 5. Rule Abstract Base Class All rules extend this: ```python class MyRule(Rule): async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None: # Event structure: # { # "topic": "home/contact/device_id/state", # "type": "state", # "cap": "contact", # "device_id": "kontakt_wohnzimmer", # "payload": {"contact": "open"}, # "ts": "2025-11-11T10:30:45.123456" # } device_id = evt['device_id'] cap = evt['cap'] if cap == 'contact': contact_state = evt['payload'].get('contact') # ... implement logic ``` ## Implementing a New Rule ### Step 1: Create Rule Class ```python from packages.rule_interface import Rule, RuleDescriptor, RuleContext from typing import Any class MyCustomRule(Rule): """My custom automation rule.""" async def on_event( self, evt: dict[str, Any], desc: RuleDescriptor, ctx: RuleContext ) -> None: """Process device state changes.""" # 1. Extract event data device_id = evt['device_id'] cap = evt['cap'] payload = evt['payload'] # 2. Filter to relevant devices if device_id not in desc.targets.get('my_devices', []): return # 3. Implement logic if cap == 'contact': if payload.get('contact') == 'open': # Do something await ctx.mqtt.publish_set_thermostat( 'some_thermostat', desc.params.get('temp', 20.0) ) # 4. Persist state if needed state_key = f"rule:{desc.id}:device:{device_id}:state" await ctx.redis.set(state_key, payload.get('contact')) ``` ### Step 2: Register in RULE_IMPLEMENTATIONS ```python # In your rule module (e.g., my_custom_rule.py) RULE_IMPLEMENTATIONS = { 'my_custom@1.0': MyCustomRule, } ``` ### Step 3: Configure in rules.yaml ```yaml rules: - id: my_custom_living_room name: My Custom Rule for Living Room type: my_custom@1.0 targets: my_devices: - device_1 - device_2 params: temp: 22.0 duration_secs: 300 ``` ## Best Practices ### Idempotency Rules MUST be idempotent - processing the same event multiple times should be safe: ```python # Good: Idempotent async def on_event(self, evt, desc, ctx): if evt['payload'].get('contact') == 'open': await ctx.mqtt.publish_set_thermostat('thermo', 16.0) # Bad: Not idempotent (increments counter) async def on_event(self, evt, desc, ctx): counter = await ctx.redis.get('counter') or '0' await ctx.redis.set('counter', str(int(counter) + 1)) ``` ### Error Handling Handle errors gracefully - the engine will catch and log exceptions: ```python async def on_event(self, evt, desc, ctx): try: await ctx.mqtt.publish_set_thermostat('thermo', 16.0) except Exception as e: ctx.logger.error(f"Failed to set thermostat: {e}") # Don't raise - let event processing continue ``` ### State Keys Use consistent naming for Redis keys: ```python # Pattern: rule:{rule_id}:{category}:{device_id}:{field} state_key = f"rule:{desc.id}:contact:{device_id}:state" ts_key = f"rule:{desc.id}:contact:{device_id}:ts" prev_key = f"rule:{desc.id}:thermo:{device_id}:previous" ``` ### Logging Use appropriate log levels: ```python ctx.logger.debug("Detailed diagnostic info") ctx.logger.info("Normal operation milestones") ctx.logger.warning("Unexpected but handled situations") ctx.logger.error("Errors that prevent operation") ``` ## Event Structure Reference ### Contact Sensor Event ```python { "topic": "home/contact/kontakt_wohnzimmer/state", "type": "state", "cap": "contact", "device_id": "kontakt_wohnzimmer", "payload": { "contact": "open" # or "closed" }, "ts": "2025-11-11T10:30:45.123456" } ``` ### Thermostat Event ```python { "topic": "home/thermostat/thermostat_wohnzimmer/state", "type": "state", "cap": "thermostat", "device_id": "thermostat_wohnzimmer", "payload": { "target": 21.0, "current": 20.5, "mode": "heat" }, "ts": "2025-11-11T10:30:45.123456" } ``` ## Testing Rules Rules can be tested independently of the engine: ```python import pytest from unittest.mock import AsyncMock, MagicMock from packages.my_custom_rule import MyCustomRule from packages.rule_interface import RuleDescriptor, RuleContext @pytest.mark.asyncio async def test_my_rule(): # Setup rule = MyCustomRule() desc = RuleDescriptor( id="test_rule", type="my_custom@1.0", targets={"my_devices": ["device_1"]}, params={"temp": 22.0} ) # Mock context ctx = RuleContext( logger=MagicMock(), mqtt_publisher=AsyncMock(), redis_state=AsyncMock(), now_fn=lambda: datetime.now() ) # Test event evt = { "device_id": "device_1", "cap": "contact", "payload": {"contact": "open"}, "ts": "2025-11-11T10:30:45.123456" } # Execute await rule.on_event(evt, desc, ctx) # Assert ctx.mqtt.publish_set_thermostat.assert_called_once_with('some_thermostat', 22.0) ``` ## Extension Points The interface is designed to be extended without modifying the engine: 1. **New rule types**: Just implement `Rule` and register in `RULE_IMPLEMENTATIONS` 2. **New MQTT commands**: Extend `MQTTPublisher` with new methods 3. **New state backends**: Implement `RedisState` interface with different storage 4. **Custom context**: Extend `RuleContext` with additional utilities The engine only depends on the abstract interfaces, not specific implementations.