""" Example Rule Implementation: Window Setback Demonstrates how to implement a Rule using the rule_interface. This rule lowers thermostat temperature when a window is opened. """ from typing import Any from pydantic import BaseModel, Field, ValidationError from apps.rules.rule_interface import Rule, RuleDescriptor, RuleContext class WindowSetbackObjects(BaseModel): """Object structure for window setback rule""" contacts: list[str] = Field(..., min_length=1, description="Contact sensors to monitor") thermostats: list[str] = Field(..., min_length=1, description="Thermostats to control") class WindowSetbackRule(Rule): """ Window setback automation rule. When a window/door contact opens, set thermostats to eco temperature. When closed for a minimum duration, restore previous target temperature. Configuration: objects: contacts: List of contact sensor device IDs to monitor (required, min 1) thermostats: List of thermostat device IDs to control (required, min 1) params: eco_target: Temperature to set when window opens (default: 16.0) previous_target_ttl_secs: How long to remember previous temperature (default: 86400) State storage (Redis keys): rule:{rule_id}:thermo:{device_id}:current_target -> Current target temp (updated on every STATE) rule:{rule_id}:thermo:{device_id}:previous -> Previous target temp (saved on window open, deleted on restore) rule:{rule_id}:contact:{device_id}:is_open -> "1" if open, "0" if closed rule:{rule_id}:state -> Overall rule state -> "1" if thermostats set to eco, "0" otherwise Logic: 1. Thermostat STATE events → update current_target in Redis 2. Window opens → copy current_target to previous, then set to eco_target 3. Window closes → restore from previous, then delete previous key """ @staticmethod def __get_redis_key_current_target(rule_id: str, thermo_id: str) -> str: """Get Redis key for current target temperature of a thermostat""" return f"rule:{rule_id}:thermo:{thermo_id}:current_target" @staticmethod def __get_redis_key_previous_target(rule_id: str, thermo_id: str) -> str: """Get Redis key for previous target temperature of a thermostat""" return f"rule:{rule_id}:thermo:{thermo_id}:previous" @staticmethod def __get_redis_key_contact_state(rule_id: str, contact_id: str) -> str: """Get Redis key for contact sensor state""" return f"rule:{rule_id}:contact:{contact_id}:is_open" @staticmethod def __get_redis_key_rule_state(rule_id: str) -> str: """Get Redis key for overall rule state""" return f"rule:{rule_id}:state" @staticmethod async def __redis_get(ctx: RuleContext, key: str) -> Any: """Helper to get value from Redis""" v = await ctx.redis.get(key) ctx.logger.debug(f"Redis GET {key} -> {v}") return v @staticmethod async def __redis_set(ctx: RuleContext, key: str, value: Any) -> None: """Helper to set value in Redis""" ctx.logger.debug(f"Redis SET {key} = {value}") await ctx.redis.set(key, value) @staticmethod async def __redis_delete(ctx: RuleContext, key: str) -> None: """Helper to delete key from Redis""" ctx.logger.debug(f"Redis DEL {key}") await ctx.redis.delete(key) def __init__(self): super().__init__() self._validated_objects: dict[str, WindowSetbackObjects] = {} async def setup(self, desc: RuleDescriptor, ctx: RuleContext) -> None: """Validate objects structure during setup""" try: validated = WindowSetbackObjects(**desc.objects) self._validated_objects[desc.id] = validated ctx.logger.info( f"Rule {desc.id} validated: {len(validated.contacts)} contacts, " f"{len(validated.thermostats)} thermostats" ) except ValidationError as e: raise ValueError( f"Invalid objects configuration for rule {desc.id}: {e}" ) from e def get_subscriptions(self, desc: RuleDescriptor) -> list[str]: """ Return MQTT topics to subscribe to. Subscribe to: - Contact sensor state changes (to detect window open/close) - Thermostat state changes (to track current target temperature) """ topics = [] # Subscribe to contact sensors contacts = desc.objects.get('contacts', []) for contact_id in contacts: topics.append(f"home/contact/{contact_id}/state") # Subscribe to thermostats to track their current target temperature thermostats = desc.objects.get('thermostats', []) for thermo_id in thermostats: topics.append(f"home/thermostat/{thermo_id}/state") return topics async def on_event( self, evt: dict[str, Any], desc: RuleDescriptor, ctx: RuleContext ) -> None: """ Process contact sensor or thermostat state changes. Logic: 1. If contact opened → remember current thermostat targets, set to eco 2. If contact closed for min_secs → restore previous targets 3. If thermostat target changed → update stored previous value """ device_id = evt['device_id'] cap = evt['cap'] payload = evt['payload'] # Only process events for devices in our objects target_contacts = desc.objects.get('contacts', []) target_thermostats = desc.objects.get('thermostats', []) if cap == 'contact' and device_id in target_contacts: await self._handle_contact_event(evt, desc, ctx) elif cap == 'thermostat' and device_id in target_thermostats: await self._handle_thermostat_event(evt, desc, ctx) async def _handle_contact_event( self, evt: dict[str, Any], desc: RuleDescriptor, ctx: RuleContext ) -> None: """Handle contact sensor state change.""" device_id = evt['device_id'] contact_state = evt['payload'].get('contact') # "open" or "closed" if not contact_state: ctx.logger.warning(f"Contact event missing 'contact' field: {evt}") return contact_state_key = WindowSetbackRule.__get_redis_key_contact_state(desc.id, device_id) await WindowSetbackRule.__redis_set(ctx, contact_state_key, '1' if contact_state == 'open' else '0') # Check if any contact is open is_open = False for contact_id in desc.objects.get('contacts', []): state_key = WindowSetbackRule.__get_redis_key_contact_state(desc.id, contact_id) state_val = await WindowSetbackRule.__redis_get(ctx, state_key) if state_val == '1': is_open = True break rule_state_key = WindowSetbackRule.__get_redis_key_rule_state(desc.id) current_rule_state = await WindowSetbackRule.__redis_get(ctx, rule_state_key) if is_open and current_rule_state != '1': # At least one contact is open, and we are not already in eco mode await self._set_eco_mode(desc, ctx) await WindowSetbackRule.__redis_set(ctx, rule_state_key, '1') elif not is_open and current_rule_state != '0': # All contacts are closed, and we are currently in eco mode await self._unset_eco_mode(desc, ctx) await WindowSetbackRule.__redis_set(ctx, rule_state_key, '0') async def _set_eco_mode(self, desc: RuleDescriptor, ctx: RuleContext) -> None: """Set thermostats to eco temperature when window opens.""" eco_target = desc.params.get('eco_target', 7.0) target_thermostats = desc.objects.get('thermostats', []) ctx.logger.info( f"Rule {desc.id}: At least one window is opened, setting {len(target_thermostats)} " f"thermostats to eco temperature {eco_target}°C" ) # FIRST: Save current target temperatures as "previous" (before we change them!) for thermo_id in target_thermostats: current_key = WindowSetbackRule.__get_redis_key_current_target(desc.id, thermo_id) current_temp_str = await WindowSetbackRule.__redis_get(ctx, current_key) if current_temp_str: # Save current as previous (with TTL) prev_key = WindowSetbackRule.__get_redis_key_previous_target(desc.id, thermo_id) await WindowSetbackRule.__redis_set(ctx, prev_key, current_temp_str) ctx.logger.debug( f"Saved previous target for {thermo_id}: {current_temp_str}°C" ) else: ctx.logger.warning( f"No current target found for {thermo_id}, cannot save previous" ) # THEN: Set all thermostats to eco temperature for thermo_id in target_thermostats: try: await ctx.mqtt.publish_set_thermostat(thermo_id, eco_target) ctx.logger.debug(f"Set {thermo_id} to {eco_target}°C") except Exception as e: ctx.logger.error(f"Failed to set {thermo_id}: {e}") async def _unset_eco_mode(self, desc: RuleDescriptor, ctx: RuleContext) -> None: """Restore thermostats to previous temperature when window closes.""" target_thermostats = desc.objects.get('thermostats', []) ctx.logger.info( f"Rule {desc.id}: All windows closed, restoring {len(target_thermostats)} " f"thermostats to previous temperatures" ) # Restore previous temperatures for thermo_id in target_thermostats: prev_key = WindowSetbackRule.__get_redis_key_previous_target(desc.id, thermo_id) prev_temp_str = await WindowSetbackRule.__redis_get(ctx, prev_key) if prev_temp_str: try: prev_temp = float(prev_temp_str) await ctx.mqtt.publish_set_thermostat(thermo_id, prev_temp) ctx.logger.debug(f"Restored {thermo_id} to {prev_temp}°C") # Delete the previous key after restoring await WindowSetbackRule.__redis_delete(ctx, prev_key) except Exception as e: ctx.logger.error(f"Failed to restore {thermo_id}: {e}") else: ctx.logger.warning( f"No previous target found for {thermo_id}, cannot restore" ) async def _handle_thermostat_event( self, evt: dict[str, Any], desc: RuleDescriptor, ctx: RuleContext ) -> None: """ Handle thermostat state change - track current target temperature. This keeps a record of the thermostat's current target, so we can save it as "previous" when a window opens. Important: We store in "current_target", NOT "previous". The "previous" key is only written when a window opens, to avoid race conditions. """ device_id = evt['device_id'] payload = evt['payload'] current_target = payload.get('target') if current_target is None: return # No target in this state update # Store current target (always update, even if it's the eco temperature) current_key = WindowSetbackRule.__get_redis_key_current_target(desc.id, device_id) await WindowSetbackRule.__redis_set(ctx, current_key, str(current_target)) ctx.logger.debug( f"Rule {desc.id}: Updated current target for {device_id}: {current_target}°C" ) # Rule registry - maps rule type to implementation class RULE_IMPLEMENTATIONS = { 'window_setback@1.0': WindowSetbackRule, }