All checks were successful
ci/woodpecker/tag/build/6 Pipeline was successful
ci/woodpecker/tag/build/5 Pipeline was successful
ci/woodpecker/tag/build/4 Pipeline was successful
ci/woodpecker/tag/build/1 Pipeline was successful
ci/woodpecker/tag/namespace Pipeline was successful
ci/woodpecker/tag/build/2 Pipeline was successful
ci/woodpecker/tag/build/3 Pipeline was successful
ci/woodpecker/tag/config Pipeline was successful
ci/woodpecker/tag/build/7 Pipeline was successful
ci/woodpecker/tag/deploy/2 Pipeline was successful
ci/woodpecker/tag/deploy/3 Pipeline was successful
ci/woodpecker/tag/deploy/4 Pipeline was successful
ci/woodpecker/tag/deploy/5 Pipeline was successful
ci/woodpecker/tag/deploy/6 Pipeline was successful
ci/woodpecker/tag/deploy/1 Pipeline was successful
ci/woodpecker/tag/ingress Pipeline was successful
276 lines
11 KiB
Python
276 lines
11 KiB
Python
"""
|
|
Example Rule Implementation: Window Setback
|
|
|
|
Demonstrates how to implement a Rule using the rule_interface.
|
|
This rule lowers thermostat temperature when a window is opened.
|
|
"""
|
|
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
|
|
from apps.rules.rule_interface import Rule, RuleDescriptor, RuleContext
|
|
|
|
|
|
class WindowSetbackObjects(BaseModel):
|
|
"""Object structure for window setback rule"""
|
|
contacts: list[str] = Field(..., min_length=1, description="Contact sensors to monitor")
|
|
thermostats: list[str] = Field(..., min_length=1, description="Thermostats to control")
|
|
|
|
|
|
|
|
class WindowSetbackRule(Rule):
|
|
"""
|
|
Window setback automation rule.
|
|
|
|
When a window/door contact opens, set thermostats to eco temperature.
|
|
When closed for a minimum duration, restore previous target temperature.
|
|
|
|
Configuration:
|
|
objects:
|
|
contacts: List of contact sensor device IDs to monitor (required, min 1)
|
|
thermostats: List of thermostat device IDs to control (required, min 1)
|
|
params:
|
|
eco_target: Temperature to set when window opens (default: 16.0)
|
|
previous_target_ttl_secs: How long to remember previous temperature (default: 86400)
|
|
|
|
State storage (Redis keys):
|
|
rule:{rule_id}:thermo:{device_id}:current_target -> Current target temp (updated on every STATE)
|
|
rule:{rule_id}:thermo:{device_id}:previous -> Previous target temp (saved on window open, deleted on restore)
|
|
rule:{rule_id}:contact:{device_id}:is_open -> "1" if open, "0" if closed
|
|
rule:{rule_id}:state -> Overall rule state -> "1" if thermostats set to eco, "0" otherwise
|
|
|
|
Logic:
|
|
1. Thermostat STATE events → update current_target in Redis
|
|
2. Window opens → copy current_target to previous, then set to eco_target
|
|
3. Window closes → restore from previous, then delete previous key
|
|
"""
|
|
|
|
@staticmethod
|
|
def __get_redis_key_current_target(rule_id: str, thermo_id: str) -> str:
|
|
"""Get Redis key for current target temperature of a thermostat"""
|
|
return f"rule:{rule_id}:thermo:{thermo_id}:current_target"
|
|
@staticmethod
|
|
def __get_redis_key_previous_target(rule_id: str, thermo_id: str) -> str:
|
|
"""Get Redis key for previous target temperature of a thermostat"""
|
|
return f"rule:{rule_id}:thermo:{thermo_id}:previous"
|
|
@staticmethod
|
|
def __get_redis_key_contact_state(rule_id: str, contact_id: str) -> str:
|
|
"""Get Redis key for contact sensor state"""
|
|
return f"rule:{rule_id}:contact:{contact_id}:is_open"
|
|
@staticmethod
|
|
def __get_redis_key_rule_state(rule_id: str) -> str:
|
|
"""Get Redis key for overall rule state"""
|
|
return f"rule:{rule_id}:state"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._validated_objects: dict[str, WindowSetbackObjects] = {}
|
|
|
|
async def setup(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
|
"""Validate objects structure during setup"""
|
|
try:
|
|
validated = WindowSetbackObjects(**desc.objects)
|
|
self._validated_objects[desc.id] = validated
|
|
ctx.logger.info(
|
|
f"Rule {desc.id} validated: {len(validated.contacts)} contacts, "
|
|
f"{len(validated.thermostats)} thermostats"
|
|
)
|
|
except ValidationError as e:
|
|
raise ValueError(
|
|
f"Invalid objects configuration for rule {desc.id}: {e}"
|
|
) from e
|
|
|
|
def get_subscriptions(self, desc: RuleDescriptor) -> list[str]:
|
|
"""
|
|
Return MQTT topics to subscribe to.
|
|
|
|
Subscribe to:
|
|
- Contact sensor state changes (to detect window open/close)
|
|
- Thermostat state changes (to track current target temperature)
|
|
"""
|
|
topics = []
|
|
|
|
# Subscribe to contact sensors
|
|
contacts = desc.objects.get('contacts', [])
|
|
for contact_id in contacts:
|
|
topics.append(f"home/contact/{contact_id}/state")
|
|
|
|
# Subscribe to thermostats to track their current target temperature
|
|
thermostats = desc.objects.get('thermostats', [])
|
|
for thermo_id in thermostats:
|
|
topics.append(f"home/thermostat/{thermo_id}/state")
|
|
|
|
return topics
|
|
|
|
async def on_event(
|
|
self,
|
|
evt: dict[str, Any],
|
|
desc: RuleDescriptor,
|
|
ctx: RuleContext
|
|
) -> None:
|
|
"""
|
|
Process contact sensor or thermostat state changes.
|
|
|
|
Logic:
|
|
1. If contact opened → remember current thermostat targets, set to eco
|
|
2. If contact closed for min_secs → restore previous targets
|
|
3. If thermostat target changed → update stored previous value
|
|
"""
|
|
device_id = evt['device_id']
|
|
cap = evt['cap']
|
|
payload = evt['payload']
|
|
|
|
# Only process events for devices in our objects
|
|
target_contacts = desc.objects.get('contacts', [])
|
|
target_thermostats = desc.objects.get('thermostats', [])
|
|
|
|
if cap == 'contact' and device_id in target_contacts:
|
|
await self._handle_contact_event(evt, desc, ctx)
|
|
|
|
elif cap == 'thermostat' and device_id in target_thermostats:
|
|
await self._handle_thermostat_event(evt, desc, ctx)
|
|
|
|
async def _handle_contact_event(
|
|
self,
|
|
evt: dict[str, Any],
|
|
desc: RuleDescriptor,
|
|
ctx: RuleContext
|
|
) -> None:
|
|
"""Handle contact sensor state change."""
|
|
device_id = evt['device_id']
|
|
contact_state = evt['payload'].get('contact') # "open" or "closed"
|
|
event_ts = evt.get('ts', ctx.now().isoformat())
|
|
|
|
if not contact_state:
|
|
ctx.logger.warning(f"Contact event missing 'contact' field: {evt}")
|
|
return
|
|
|
|
contact_state_key = WindowSetbackRule.__get_redis_key_contact_state(desc.id, device_id)
|
|
await ctx.redis.set(contact_state_key, '1' if contact_state == 'open' else '0')
|
|
|
|
# Check if any contact is open
|
|
is_open = False
|
|
for contact_id in desc.objects.get('contacts', []):
|
|
state_key = WindowSetbackRule.__get_redis_key_contact_state(desc.id, contact_id)
|
|
state_val = await ctx.redis.get(state_key)
|
|
if state_val == '1':
|
|
is_open = True
|
|
break
|
|
|
|
rule_state_key = WindowSetbackRule.__get_redis_key_rule_state(desc.id)
|
|
current_rule_state = await ctx.redis.get(rule_state_key)
|
|
if is_open and current_rule_state != '1':
|
|
# At least one contact is open, and we are not already in eco mode
|
|
await self._set_eco_mode(desc, ctx)
|
|
await ctx.redis.set(rule_state_key, '1')
|
|
elif not is_open and current_rule_state != '0':
|
|
# All contacts are closed, and we are currently in eco mode
|
|
await self._unset_eco_mode(desc, ctx)
|
|
await ctx.redis.set(rule_state_key, '0')
|
|
|
|
|
|
async def _set_eco_mode(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
|
"""Set thermostats to eco temperature when window opens."""
|
|
eco_target = desc.params.get('eco_target', 7.0)
|
|
target_thermostats = desc.objects.get('thermostats', [])
|
|
ttl_secs = desc.params.get('previous_target_ttl_secs', 86400)
|
|
|
|
ctx.logger.info(
|
|
f"Rule {desc.id}: At least one window is opened, setting {len(target_thermostats)} "
|
|
f"thermostats to eco temperature {eco_target}°C"
|
|
)
|
|
|
|
# FIRST: Save current target temperatures as "previous" (before we change them!)
|
|
for thermo_id in target_thermostats:
|
|
current_key = WindowSetbackRule.__get_redis_key_current_target(desc.id, thermo_id)
|
|
current_temp_str = await ctx.redis.get(current_key)
|
|
|
|
if current_temp_str:
|
|
# Save current as previous (with TTL)
|
|
prev_key = WindowSetbackRule.__get_redis_key_previous_target(desc.id, thermo_id)
|
|
await ctx.redis.set(prev_key, current_temp_str, ttl_secs=ttl_secs)
|
|
ctx.logger.debug(
|
|
f"Saved previous target for {thermo_id}: {current_temp_str}°C"
|
|
)
|
|
else:
|
|
ctx.logger.warning(
|
|
f"No current target found for {thermo_id}, cannot save previous"
|
|
)
|
|
|
|
# THEN: Set all thermostats to eco temperature
|
|
for thermo_id in target_thermostats:
|
|
try:
|
|
await ctx.mqtt.publish_set_thermostat(thermo_id, eco_target)
|
|
ctx.logger.debug(f"Set {thermo_id} to {eco_target}°C")
|
|
except Exception as e:
|
|
ctx.logger.error(f"Failed to set {thermo_id}: {e}")
|
|
|
|
|
|
async def _unset_eco_mode(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
|
|
"""Restore thermostats to previous temperature when window closes."""
|
|
target_thermostats = desc.objects.get('thermostats', [])
|
|
|
|
ctx.logger.info(
|
|
f"Rule {desc.id}: All windows closed, restoring {len(target_thermostats)} "
|
|
f"thermostats to previous temperatures"
|
|
)
|
|
|
|
# Restore previous temperatures
|
|
for thermo_id in target_thermostats:
|
|
prev_key = WindowSetbackRule.__get_redis_key_previous_target(desc.id, thermo_id)
|
|
prev_temp_str = await ctx.redis.get(prev_key)
|
|
|
|
if prev_temp_str:
|
|
try:
|
|
prev_temp = float(prev_temp_str)
|
|
await ctx.mqtt.publish_set_thermostat(thermo_id, prev_temp)
|
|
ctx.logger.debug(f"Restored {thermo_id} to {prev_temp}°C")
|
|
|
|
# Delete the previous key after restoring
|
|
await ctx.redis.delete(prev_key)
|
|
except Exception as e:
|
|
ctx.logger.error(f"Failed to restore {thermo_id}: {e}")
|
|
else:
|
|
ctx.logger.warning(
|
|
f"No previous target found for {thermo_id}, cannot restore"
|
|
)
|
|
|
|
async def _handle_thermostat_event(
|
|
self,
|
|
evt: dict[str, Any],
|
|
desc: RuleDescriptor,
|
|
ctx: RuleContext
|
|
) -> None:
|
|
"""
|
|
Handle thermostat state change - track current target temperature.
|
|
|
|
This keeps a record of the thermostat's current target, so we can
|
|
save it as "previous" when a window opens.
|
|
|
|
Important: We store in "current_target", NOT "previous". The "previous"
|
|
key is only written when a window opens, to avoid race conditions.
|
|
"""
|
|
device_id = evt['device_id']
|
|
payload = evt['payload']
|
|
current_target = payload.get('target')
|
|
|
|
if current_target is None:
|
|
return # No target in this state update
|
|
|
|
# Store current target (always update, even if it's the eco temperature)
|
|
current_key = WindowSetbackRule.__get_redis_key_current_target(desc.id, device_id)
|
|
ttl_secs = desc.params.get('previous_target_ttl_secs', 86400)
|
|
|
|
await ctx.redis.set(current_key, str(current_target), ttl_secs=ttl_secs)
|
|
|
|
ctx.logger.debug(
|
|
f"Rule {desc.id}: Updated current target for {device_id}: {current_target}°C"
|
|
)
|
|
|
|
|
|
# Rule registry - maps rule type to implementation class
|
|
RULE_IMPLEMENTATIONS = {
|
|
'window_setback@1.0': WindowSetbackRule,
|
|
}
|