Files
home-automation/apps/rules/RULE_INTERFACE.md
2025-11-11 16:38:41 +01:00

9.8 KiB

Rule Interface Documentation

Overview

The rule interface provides a clean abstraction for implementing automation rules. Rules respond to device state changes and can publish commands, persist state, and log diagnostics.

Core Components

1. RuleDescriptor

Configuration data for a rule instance (loaded from rules.yaml):

RuleDescriptor(
    id="window_setback_wohnzimmer",           # Unique rule ID
    name="Fensterabsenkung Wohnzimmer",       # Optional display name
    type="window_setback@1.0",                # Rule type + version
    targets={                                  # Rule-specific targets
        "rooms": ["Wohnzimmer"],
        "contacts": ["kontakt_wohnzimmer_..."],
        "thermostats": ["thermostat_wohnzimmer"]
    },
    params={                                   # Rule-specific parameters
        "eco_target": 16.0,
        "open_min_secs": 20
    }
)

2. RedisState

Async state persistence with automatic reconnection and retry logic:

# Initialize (done by rule engine)
redis_state = RedisState("redis://172.23.1.116:6379/8")

# Simple key-value with TTL
await ctx.redis.set("rules:my_rule:temp", "22.5", ttl_secs=3600)
value = await ctx.redis.get("rules:my_rule:temp")  # Returns "22.5" or None

# Hash storage (for multiple related values)
await ctx.redis.hset("rules:my_rule:sensors", "bedroom", "open")
await ctx.redis.hset("rules:my_rule:sensors", "kitchen", "closed")
value = await ctx.redis.hget("rules:my_rule:sensors", "bedroom")  # "open"

# TTL management
await ctx.redis.expire("rules:my_rule:temp", 7200)  # Extend to 2 hours

# JSON helpers (for complex data)
import json
data = {"temp": 22.5, "humidity": 45}
await ctx.redis.set("rules:my_rule:data", ctx.redis._dumps(data))
stored = await ctx.redis.get("rules:my_rule:data")
parsed = ctx.redis._loads(stored) if stored else None

Key Conventions:

  • Use prefix rules:{rule_id}: for all keys
  • Example: rules:window_setback_wohnzimmer:thermo:device_123:previous
  • TTL recommended for temporary state (previous temperatures, timers)

Robustness Features:

  • Automatic retry with exponential backoff (default: 3 retries)
  • Connection pooling (max 10 connections)
  • Automatic reconnection on Redis restart
  • Health checks every 30 seconds
  • All operations wait and retry, no exceptions on temporary outages

3. MQTTClient

Async MQTT client with event normalization and command publishing:

# Initialize (done by rule engine)
mqtt_client = MQTTClient(
    broker="172.16.2.16",
    port=1883,
    client_id="rule_engine"
)

# Subscribe and receive normalized events
async for event in mqtt_client.connect():
    # Event structure:
    # {
    #     "topic": "home/contact/sensor_1/state",
    #     "type": "state",
    #     "cap": "contact",           # Capability (contact, thermostat, etc.)
    #     "device_id": "sensor_1",
    #     "payload": {"contact": "open"},
    #     "ts": "2025-11-11T10:30:45.123456"
    # }
    
    if event['cap'] == 'contact':
        handle_contact(event)
    elif event['cap'] == 'thermostat':
        handle_thermostat(event)

# Publish commands (within async context)
await mqtt_client.publish_set_thermostat("thermostat_id", 22.5)

Subscriptions:

  • home/contact/+/state - All contact sensor state changes
  • home/thermostat/+/state - All thermostat state changes

Publishing:

  • Topic: home/thermostat/{device_id}/set
  • Payload: {"type":"thermostat","payload":{"target":22.5}}
  • QoS: 1 (at least once delivery)

Robustness:

  • Automatic reconnection with exponential backoff
  • Connection logging (connect/disconnect events)
  • Clean session handling

4. MQTTPublisher (Legacy)

Simplified wrapper around MQTTClient for backward compatibility:

# Set thermostat temperature
await ctx.mqtt.publish_set_thermostat("thermostat_wohnzimmer", 21.5)

5. RuleContext

Runtime context provided to rules:

class RuleContext:
    logger           # Logger instance
    mqtt             # MQTTPublisher
    redis            # RedisState
    now() -> datetime  # Current timestamp

5. Rule Abstract Base Class

All rules extend this:

class MyRule(Rule):
    async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None:
        # Event structure:
        # {
        #     "topic": "home/contact/device_id/state",
        #     "type": "state",
        #     "cap": "contact",
        #     "device_id": "kontakt_wohnzimmer",
        #     "payload": {"contact": "open"},
        #     "ts": "2025-11-11T10:30:45.123456"
        # }
        
        device_id = evt['device_id']
        cap = evt['cap']
        
        if cap == 'contact':
            contact_state = evt['payload'].get('contact')
            # ... implement logic

Implementing a New Rule

Step 1: Create Rule Class

from packages.rule_interface import Rule, RuleDescriptor, RuleContext
from typing import Any

class MyCustomRule(Rule):
    """My custom automation rule."""
    
    async def on_event(
        self,
        evt: dict[str, Any],
        desc: RuleDescriptor,
        ctx: RuleContext
    ) -> None:
        """Process device state changes."""
        
        # 1. Extract event data
        device_id = evt['device_id']
        cap = evt['cap']
        payload = evt['payload']
        
        # 2. Filter to relevant devices
        if device_id not in desc.targets.get('my_devices', []):
            return
        
        # 3. Implement logic
        if cap == 'contact':
            if payload.get('contact') == 'open':
                # Do something
                await ctx.mqtt.publish_set_thermostat(
                    'some_thermostat',
                    desc.params.get('temp', 20.0)
                )
        
        # 4. Persist state if needed
        state_key = f"rule:{desc.id}:device:{device_id}:state"
        await ctx.redis.set(state_key, payload.get('contact'))

Step 2: Register in RULE_IMPLEMENTATIONS

# In your rule module (e.g., my_custom_rule.py)
RULE_IMPLEMENTATIONS = {
    'my_custom@1.0': MyCustomRule,
}

Step 3: Configure in rules.yaml

rules:
  - id: my_custom_living_room
    name: My Custom Rule for Living Room
    type: my_custom@1.0
    targets:
      my_devices:
        - device_1
        - device_2
    params:
      temp: 22.0
      duration_secs: 300

Best Practices

Idempotency

Rules MUST be idempotent - processing the same event multiple times should be safe:

# Good: Idempotent
async def on_event(self, evt, desc, ctx):
    if evt['payload'].get('contact') == 'open':
        await ctx.mqtt.publish_set_thermostat('thermo', 16.0)

# Bad: Not idempotent (increments counter)
async def on_event(self, evt, desc, ctx):
    counter = await ctx.redis.get('counter') or '0'
    await ctx.redis.set('counter', str(int(counter) + 1))

Error Handling

Handle errors gracefully - the engine will catch and log exceptions:

async def on_event(self, evt, desc, ctx):
    try:
        await ctx.mqtt.publish_set_thermostat('thermo', 16.0)
    except Exception as e:
        ctx.logger.error(f"Failed to set thermostat: {e}")
        # Don't raise - let event processing continue

State Keys

Use consistent naming for Redis keys:

# Pattern: rule:{rule_id}:{category}:{device_id}:{field}
state_key = f"rule:{desc.id}:contact:{device_id}:state"
ts_key = f"rule:{desc.id}:contact:{device_id}:ts"
prev_key = f"rule:{desc.id}:thermo:{device_id}:previous"

Logging

Use appropriate log levels:

ctx.logger.debug("Detailed diagnostic info")
ctx.logger.info("Normal operation milestones")
ctx.logger.warning("Unexpected but handled situations")
ctx.logger.error("Errors that prevent operation")

Event Structure Reference

Contact Sensor Event

{
    "topic": "home/contact/kontakt_wohnzimmer/state",
    "type": "state",
    "cap": "contact",
    "device_id": "kontakt_wohnzimmer",
    "payload": {
        "contact": "open"  # or "closed"
    },
    "ts": "2025-11-11T10:30:45.123456"
}

Thermostat Event

{
    "topic": "home/thermostat/thermostat_wohnzimmer/state",
    "type": "state",
    "cap": "thermostat",
    "device_id": "thermostat_wohnzimmer",
    "payload": {
        "target": 21.0,
        "current": 20.5,
        "mode": "heat"
    },
    "ts": "2025-11-11T10:30:45.123456"
}

Testing Rules

Rules can be tested independently of the engine:

import pytest
from unittest.mock import AsyncMock, MagicMock
from packages.my_custom_rule import MyCustomRule
from packages.rule_interface import RuleDescriptor, RuleContext

@pytest.mark.asyncio
async def test_my_rule():
    # Setup
    rule = MyCustomRule()
    
    desc = RuleDescriptor(
        id="test_rule",
        type="my_custom@1.0",
        targets={"my_devices": ["device_1"]},
        params={"temp": 22.0}
    )
    
    # Mock context
    ctx = RuleContext(
        logger=MagicMock(),
        mqtt_publisher=AsyncMock(),
        redis_state=AsyncMock(),
        now_fn=lambda: datetime.now()
    )
    
    # Test event
    evt = {
        "device_id": "device_1",
        "cap": "contact",
        "payload": {"contact": "open"},
        "ts": "2025-11-11T10:30:45.123456"
    }
    
    # Execute
    await rule.on_event(evt, desc, ctx)
    
    # Assert
    ctx.mqtt.publish_set_thermostat.assert_called_once_with('some_thermostat', 22.0)

Extension Points

The interface is designed to be extended without modifying the engine:

  1. New rule types: Just implement Rule and register in RULE_IMPLEMENTATIONS
  2. New MQTT commands: Extend MQTTPublisher with new methods
  3. New state backends: Implement RedisState interface with different storage
  4. Custom context: Extend RuleContext with additional utilities

The engine only depends on the abstract interfaces, not specific implementations.