From b6b441c0cae694f6b390ff91f6bc12b2185c82ab Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Tue, 11 Nov 2025 19:58:06 +0100 Subject: [PATCH] rules 2 --- apps/rules/impl/window_setback.py | 132 ++++++++++++++++++++++++------ apps/rules/main.py | 102 +++++++++++++++++------ apps/rules/rule_interface.py | 54 ++++++++++-- apps/rules/rules_config.py | 21 ++--- config/rules.yaml | 25 ++---- 5 files changed, 245 insertions(+), 89 deletions(-) diff --git a/apps/rules/impl/window_setback.py b/apps/rules/impl/window_setback.py index 2c6b298..0f29cf1 100644 --- a/apps/rules/impl/window_setback.py +++ b/apps/rules/impl/window_setback.py @@ -7,9 +7,17 @@ This rule lowers thermostat temperature when a window is opened. from typing import Any +from pydantic import BaseModel, Field, ValidationError + from apps.rules.rule_interface import Rule, RuleDescriptor, RuleContext +class WindowSetbackObjects(BaseModel): + """Object structure for window setback rule""" + contacts: list[str] = Field(..., min_length=1, description="Contact sensors to monitor") + thermostats: list[str] = Field(..., min_length=1, description="Thermostats to control") + + class WindowSetbackRule(Rule): """ Window setback automation rule. @@ -18,22 +26,67 @@ class WindowSetbackRule(Rule): When closed for a minimum duration, restore previous target temperature. Configuration: - targets: - contacts: List of contact sensor device IDs - thermostats: List of thermostat device IDs + objects: + contacts: List of contact sensor device IDs to monitor (required, min 1) + thermostats: List of thermostat device IDs to control (required, min 1) params: eco_target: Temperature to set when window opens (default: 16.0) open_min_secs: Minimum seconds window must be open before triggering (default: 20) close_min_secs: Minimum seconds window must be closed before restoring (default: 20) previous_target_ttl_secs: How long to remember previous temperature (default: 86400) - State storage: - Redis keys: - rule:{rule_id}:contact:{device_id}:state -> "open" | "closed" - rule:{rule_id}:contact:{device_id}:ts -> ISO timestamp of last change - rule:{rule_id}:thermo:{device_id}:previous -> Previous target temperature + State storage (Redis keys): + rule:{rule_id}:contact:{device_id}:state -> "open" | "closed" + rule:{rule_id}:contact:{device_id}:ts -> ISO timestamp of last change + rule:{rule_id}:thermo:{device_id}:current_target -> Current target temp (updated on every STATE) + rule:{rule_id}:thermo:{device_id}:previous -> Previous target temp (saved on window open, deleted on restore) + + Logic: + 1. Thermostat STATE events → update current_target in Redis + 2. Window opens → copy current_target to previous, then set to eco_target + 3. Window closes → restore from previous, then delete previous key """ + def __init__(self): + super().__init__() + self._validated_objects: dict[str, WindowSetbackObjects] = {} + + async def setup(self, desc: RuleDescriptor, ctx: RuleContext) -> None: + """Validate objects structure during setup""" + try: + validated = WindowSetbackObjects(**desc.objects) + self._validated_objects[desc.id] = validated + ctx.logger.info( + f"Rule {desc.id} validated: {len(validated.contacts)} contacts, " + f"{len(validated.thermostats)} thermostats" + ) + except ValidationError as e: + raise ValueError( + f"Invalid objects configuration for rule {desc.id}: {e}" + ) from e + + def get_subscriptions(self, desc: RuleDescriptor) -> list[str]: + """ + Return MQTT topics to subscribe to. + + Subscribe to: + - Contact sensor state changes (to detect window open/close) + - Thermostat state changes (to track current target temperature) + """ + topics = [] + + # Subscribe to contact sensors + contacts = desc.objects.get('contacts', []) + for contact_id in contacts: + topics.append(f"home/contact/{contact_id}/state") + + # Subscribe to thermostats to track their current target temperature + thermostats = desc.objects.get('thermostats', []) + for thermo_id in thermostats: + topics.append(f"home/thermostat/{thermo_id}/state") + + return topics + async def on_event( self, evt: dict[str, Any], @@ -52,9 +105,9 @@ class WindowSetbackRule(Rule): cap = evt['cap'] payload = evt['payload'] - # Only process events for devices in our targets - target_contacts = desc.targets.contacts or [] - target_thermostats = desc.targets.thermostats or [] + # Only process events for devices in our objects + target_contacts = desc.objects.get('contacts', []) + target_thermostats = desc.objects.get('thermostats', []) if cap == 'contact' and device_id in target_contacts: await self._handle_contact_event(evt, desc, ctx) @@ -90,16 +143,39 @@ class WindowSetbackRule(Rule): await self._on_window_closed(desc, ctx) async def _on_window_opened(self, desc: RuleDescriptor, ctx: RuleContext) -> None: - """Window opened - set thermostats to eco temperature.""" + """ + Window opened - save current temperatures, then set thermostats to eco. + + Important: We must save the current target BEFORE setting to eco, + otherwise we'll save the eco temperature instead of the original. + """ eco_target = desc.params.get('eco_target', 16.0) - target_thermostats = desc.targets.thermostats or [] + target_thermostats = desc.objects.get('thermostats', []) + ttl_secs = desc.params.get('previous_target_ttl_secs', 86400) ctx.logger.info( f"Rule {desc.id}: Window opened, setting {len(target_thermostats)} " f"thermostats to eco temperature {eco_target}°C" ) - # Set all thermostats to eco temperature + # FIRST: Save current target temperatures as "previous" (before we change them!) + for thermo_id in target_thermostats: + current_key = f"rule:{desc.id}:thermo:{thermo_id}:current_target" + current_temp_str = await ctx.redis.get(current_key) + + if current_temp_str: + # Save current as previous (with TTL) + prev_key = f"rule:{desc.id}:thermo:{thermo_id}:previous" + await ctx.redis.set(prev_key, current_temp_str, ttl_secs=ttl_secs) + ctx.logger.debug( + f"Saved previous target for {thermo_id}: {current_temp_str}°C" + ) + else: + ctx.logger.warning( + f"No current target found for {thermo_id}, cannot save previous" + ) + + # THEN: Set all thermostats to eco temperature for thermo_id in target_thermostats: try: await ctx.mqtt.publish_set_thermostat(thermo_id, eco_target) @@ -109,13 +185,12 @@ class WindowSetbackRule(Rule): async def _on_window_closed(self, desc: RuleDescriptor, ctx: RuleContext) -> None: """ - Window closed - restore previous temperatures if closed long enough. + Window closed - restore previous temperatures. Note: This is simplified. A production implementation would check close_min_secs and use a timer/scheduler. """ - target_thermostats = desc.targets.thermostats or [] - ttl_secs = desc.params.get('previous_target_ttl_secs', 86400) + target_thermostats = desc.objects.get('thermostats', []) ctx.logger.info( f"Rule {desc.id}: Window closed, restoring {len(target_thermostats)} " @@ -132,8 +207,15 @@ class WindowSetbackRule(Rule): prev_temp = float(prev_temp_str) await ctx.mqtt.publish_set_thermostat(thermo_id, prev_temp) ctx.logger.debug(f"Restored {thermo_id} to {prev_temp}°C") + + # Delete the previous key after restoring + await ctx.redis.delete(prev_key) except Exception as e: ctx.logger.error(f"Failed to restore {thermo_id}: {e}") + else: + ctx.logger.warning( + f"No previous target found for {thermo_id}, cannot restore" + ) async def _handle_thermostat_event( self, @@ -142,9 +224,13 @@ class WindowSetbackRule(Rule): ctx: RuleContext ) -> None: """ - Handle thermostat state change - remember current target. + Handle thermostat state change - track current target temperature. - This allows us to restore the temperature when window closes. + This keeps a record of the thermostat's current target, so we can + save it as "previous" when a window opens. + + Important: We store in "current_target", NOT "previous". The "previous" + key is only written when a window opens, to avoid race conditions. """ device_id = evt['device_id'] payload = evt['payload'] @@ -153,14 +239,14 @@ class WindowSetbackRule(Rule): if current_target is None: return # No target in this state update - # Store as previous target with TTL - prev_key = f"rule:{desc.id}:thermo:{device_id}:previous" + # Store current target (always update, even if it's the eco temperature) + current_key = f"rule:{desc.id}:thermo:{device_id}:current_target" ttl_secs = desc.params.get('previous_target_ttl_secs', 86400) - await ctx.redis.set(prev_key, str(current_target), ttl_secs=ttl_secs) + await ctx.redis.set(current_key, str(current_target), ttl_secs=ttl_secs) ctx.logger.debug( - f"Rule {desc.id}: Stored previous target for {device_id}: {current_target}°C" + f"Rule {desc.id}: Updated current target for {device_id}: {current_target}°C" ) diff --git a/apps/rules/main.py b/apps/rules/main.py index 9c01cbb..2b75297 100644 --- a/apps/rules/main.py +++ b/apps/rules/main.py @@ -24,7 +24,7 @@ from apps.rules.rule_interface import ( # Configure logging logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) @@ -63,11 +63,12 @@ class RuleEngine: self.mqtt_client: MQTTClient | None = None self.redis_state: RedisState | None = None self.context: RuleContext | None = None + self._mqtt_topics: list[str] = [] # Topics to subscribe to # For graceful shutdown self._shutdown_event = asyncio.Event() - def setup(self) -> None: + async def setup(self) -> None: """ Load configuration and instantiate rules. @@ -102,14 +103,55 @@ class RuleEngine: disabled_count = total_count - enabled_count logger.info(f"Successfully loaded {enabled_count} rule implementation(s) ({disabled_count} disabled)") + # Call setup on each rule for validation + for rule_id, rule_instance in self.rules.items(): + desc = next((d for d in self.rule_descriptors if d.id == rule_id), None) + if desc: + try: + ctx = RuleContext( + logger=logger, + mqtt_publisher=self.mqtt_client, + redis_state=self.redis_state + ) + await rule_instance.setup(desc, ctx) + except Exception as e: + logger.error(f"Failed to setup rule {rule_id}: {e}") + raise + + # Collect MQTT subscriptions from all enabled rules + all_topics = set() + for rule_id, rule_instance in self.rules.items(): + desc = next((d for d in self.rule_descriptors if d.id == rule_id), None) + if desc: + try: + topics = rule_instance.get_subscriptions(desc) + all_topics.update(topics) + logger.debug(f"Rule {rule_id} subscribes to {len(topics)} topic(s)") + except Exception as e: + logger.error(f"Failed to get subscriptions for rule {rule_id}: {e}") + raise + + logger.info(f"Total MQTT subscriptions needed: {len(all_topics)}") + + # Create unique client ID to avoid conflicts + import uuid + import os + + client_id_base = "rule_engine" + client_suffix = os.environ.get("MQTT_CLIENT_ID_SUFFIX") or uuid.uuid4().hex[:6] + unique_client_id = f"{client_id_base}-{client_suffix}" + # Initialize MQTT client self.mqtt_client = MQTTClient( broker=self.mqtt_broker, port=self.mqtt_port, - client_id="rule_engine" + client_id=unique_client_id ) self.mqtt_client.set_logger(logger) + # Store topics for connection + self._mqtt_topics = list(all_topics) + # Initialize Redis state self.redis_state = RedisState(self.redis_url) @@ -129,10 +171,7 @@ class RuleEngine: """ Filter rules that should receive this event. - Rules match if: - - For contact events: device_id in targets.contacts - - For thermostat events: device_id in targets.thermostats - - (Room-based filtering could be added here) + Rules match if the event's device_id is in the rule's objects. Args: event: Normalized MQTT event @@ -149,27 +188,36 @@ class RuleEngine: logger.debug(f"Filtering for cap={cap}, device_id={device_id}") - for rule_id, desc in [(r.id, r) for r in self.rule_descriptors]: - targets = desc.targets + # Only check enabled rules (rules in self.rules dict) + for rule_id, rule_instance in self.rules.items(): + desc = next((d for d in self.rule_descriptors if d.id == rule_id), None) + if not desc: + continue + + objects = desc.objects - # Check if this device is in the rule's targets + # Check if this device is in the rule's objects matched = False - if cap == 'contact' and targets.contacts: - logger.debug(f"Rule {rule_id}: checking contacts {targets.contacts}") - if device_id in targets.contacts: + if cap == 'contact' and objects.get('contacts'): + logger.debug(f"Rule {rule_id}: checking contacts {objects.get('contacts')}") + if device_id in objects.get('contacts', []): matched = True - elif cap == 'thermostat' and targets.thermostats: - logger.debug(f"Rule {rule_id}: checking thermostats {targets.thermostats}") - if device_id in targets.thermostats: + elif cap == 'thermostat' and objects.get('thermostats'): + logger.debug(f"Rule {rule_id}: checking thermostats {objects.get('thermostats')}") + if device_id in objects.get('thermostats', []): matched = True - # Could add room-based filtering here: - # elif 'rooms' in targets: - # device_room = get_device_room(device_id) - # if device_room in targets['rooms']: - # matched = True + elif cap == 'light' and objects.get('lights'): + logger.debug(f"Rule {rule_id}: checking lights {objects.get('lights')}") + if device_id in objects.get('lights', []): + matched = True + + elif cap == 'relay' and objects.get('relays'): + logger.debug(f"Rule {rule_id}: checking relays {objects.get('relays')}") + if device_id in objects.get('relays', []): + matched = True if matched: matching_rules.append((rule_id, desc)) @@ -227,7 +275,7 @@ class RuleEngine: logger.info("Starting event processing loop") try: - async for event in self.mqtt_client.connect(): + async for event in self.mqtt_client.connect(topics=self._mqtt_topics): # Check for shutdown if self._shutdown_event.is_set(): logger.info("Shutdown signal received, stopping event loop") @@ -284,24 +332,28 @@ async def main_async() -> None: # Load rules try: - engine.setup() + await engine.setup() except Exception as e: logger.error(f"Failed to setup engine: {e}", exc_info=True) sys.exit(1) # Setup signal handlers for graceful shutdown loop = asyncio.get_running_loop() + main_task = None def signal_handler(): logger.info("Received shutdown signal") - asyncio.create_task(engine.shutdown()) + engine._shutdown_event.set() + if main_task and not main_task.done(): + main_task.cancel() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) # Run engine try: - await engine.run() + main_task = asyncio.create_task(engine.run()) + await main_task except asyncio.CancelledError: logger.info("Main task cancelled") finally: diff --git a/apps/rules/rule_interface.py b/apps/rules/rule_interface.py index 9b7b70a..47c4482 100644 --- a/apps/rules/rule_interface.py +++ b/apps/rules/rule_interface.py @@ -21,14 +21,18 @@ class RuleDescriptor(BaseModel): This is the validated representation of a rule from rules.yaml. The engine loads these and passes them to rule implementations. + + The 'objects' field is intentionally flexible (dict) to allow different + rule types to define their own object structures. """ id: str = Field(..., description="Unique identifier for this rule instance") name: Optional[str] = Field(None, description="Optional human-readable name") type: str = Field(..., description="Rule type with version (e.g., 'window_setback@1.0')") - targets: dict[str, Any] = Field( + enabled: bool = Field(default=True, description="Whether this rule is enabled") + objects: dict[str, Any] = Field( default_factory=dict, - description="Rule-specific target specification (rooms, devices, etc.)" + description="Objects this rule monitors or controls (structure varies by rule type)" ) params: dict[str, Any] = Field( default_factory=dict, @@ -309,16 +313,22 @@ class MQTTClient: else: print(f"[{level.upper()}] {msg}") - async def connect(self): + async def connect(self, topics: list[str] = None): """ Connect to MQTT broker with automatic reconnection. This method manages the connection and automatically reconnects with exponential backoff if the connection is lost. + + Args: + topics: List of MQTT topics to subscribe to. If None, subscribes to nothing. """ import aiomqtt from aiomqtt import Client + if topics is None: + topics = [] + reconnect_delay = self._reconnect_interval while True: @@ -333,10 +343,11 @@ class MQTTClient: self._client = client self._log("info", f"Connected to MQTT broker {self._broker}:{self._port}") - # Subscribe to device state topics - await client.subscribe("home/contact/+/state") - await client.subscribe("home/thermostat/+/state") - self._log("info", "Subscribed to home/contact/+/state, home/thermostat/+/state") + # Subscribe to provided topics + if topics: + for topic in topics: + await client.subscribe(topic) + self._log("info", f"Subscribed to {len(topics)} topic(s): {', '.join(topics[:5])}{'...' if len(topics) > 5 else ''}") # Reset reconnect delay on successful connection reconnect_delay = self._reconnect_interval @@ -542,6 +553,13 @@ class Rule(ABC): Example implementation: class WindowSetbackRule(Rule): + def get_subscriptions(self, desc: RuleDescriptor) -> list[str]: + # Subscribe to contact sensor state topics + topics = [] + for contact_id in desc.objects.contacts or []: + topics.append(f"home/contact/{contact_id}/state") + return topics + async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None: device_id = evt['device_id'] cap = evt['cap'] @@ -550,11 +568,31 @@ class Rule(ABC): contact_state = evt['payload'].get('contact') if contact_state == 'open': # Window opened - set thermostats to eco - for thermo_id in desc.targets.get('thermostats', []): + for thermo_id in desc.objects.thermostats or []: eco_temp = desc.params.get('eco_target', 16.0) await ctx.mqtt.publish_set_thermostat(thermo_id, eco_temp) """ + @abstractmethod + def get_subscriptions(self, desc: RuleDescriptor) -> list[str]: + """ + Return list of MQTT topics this rule needs to subscribe to. + + Called once during rule engine setup. The rule examines its configuration + (desc.objects) and returns the specific state topics it needs to monitor. + + Args: + desc: Rule configuration from rules.yaml + + Returns: + List of MQTT topic patterns/strings to subscribe to + + Example: + For a window setback rule monitoring 2 contacts: + ['home/contact/sensor_bedroom/state', 'home/contact/sensor_kitchen/state'] + """ + pass + @abstractmethod async def on_event( self, diff --git a/apps/rules/rules_config.py b/apps/rules/rules_config.py index afcf86d..4508a0d 100644 --- a/apps/rules/rules_config.py +++ b/apps/rules/rules_config.py @@ -11,22 +11,13 @@ import yaml from pydantic import BaseModel, Field, field_validator -class RuleTargets(BaseModel): - """Targets for a rule (rooms, devices, etc.)""" - rooms: list[str] = Field(default_factory=list) - contacts: list[str] = Field(default_factory=list) - thermostats: list[str] = Field(default_factory=list) - lights: list[str] = Field(default_factory=list) - relays: list[str] = Field(default_factory=list) - - class Rule(BaseModel): """Single rule configuration""" id: str = Field(..., description="Unique rule identifier") name: Optional[str] = Field(None, description="Optional human-readable name") type: str = Field(..., description="Rule type (e.g., 'window_setback@1.0')") enabled: bool = Field(default=True, description="Whether this rule is enabled") - targets: RuleTargets = Field(..., description="Target rooms and devices") + objects: dict[str, Any] = Field(default_factory=dict, description="Objects this rule monitors or controls") params: dict[str, Any] = Field(default_factory=dict, description="Rule-specific parameters") @field_validator('id') @@ -121,9 +112,11 @@ if __name__ == "__main__": print(f"✅ Loaded {len(config.rules)} rules:") for rule in config.rules: name = f" ({rule.name})" if rule.name else "" - print(f" - {rule.id}{name}: {rule.type}") - print(f" Targets: {len(rule.targets.rooms)} rooms, " - f"{len(rule.targets.contacts)} contacts, " - f"{len(rule.targets.thermostats)} thermostats") + enabled = "✓" if rule.enabled else "✗" + print(f" [{enabled}] {rule.id}{name}: {rule.type}") + if rule.objects: + obj_summary = ", ".join(f"{k}: {len(v) if isinstance(v, list) else v}" + for k, v in rule.objects.items()) + print(f" Objects: {obj_summary}") except Exception as e: print(f"❌ Configuration error: {e}") diff --git a/config/rules.yaml b/config/rules.yaml index a36b242..d972dc1 100644 --- a/config/rules.yaml +++ b/config/rules.yaml @@ -6,9 +6,7 @@ rules: enabled: false name: Fensterabsenkung Esszimmer type: window_setback@1.0 - targets: - rooms: - - Esszimmer + objects: contacts: - kontakt_esszimmer_strasse_links - kontakt_esszimmer_strasse_rechts @@ -23,9 +21,7 @@ rules: enabled: false name: Fensterabsenkung Küche type: window_setback@1.0 - targets: - rooms: - - Küche + objects: contacts: - kontakt_kueche_garten_fenster - kontakt_kueche_garten_tuer @@ -42,9 +38,7 @@ rules: enabled: false name: Fensterabsenkung Arbeitszimmer Patty type: window_setback@1.0 - targets: - rooms: - - Arbeitszimmer Patty + objects: contacts: - kontakt_patty_garten_links - kontakt_patty_garten_rechts @@ -60,9 +54,7 @@ rules: enabled: false name: Fensterabsenkung Schlafzimmer type: window_setback@1.0 - targets: - rooms: - - Schlafzimmer + objects: contacts: - kontakt_schlafzimmer_strasse thermostats: @@ -76,9 +68,7 @@ rules: enabled: false name: Fensterabsenkung Wohnzimmer type: window_setback@1.0 - targets: - rooms: - - Wohnzimmer + objects: contacts: - kontakt_wohnzimmer_garten_links - kontakt_wohnzimmer_garten_rechts @@ -93,9 +83,7 @@ rules: enabled: true name: Fensterabsenkung Arbeitszimmer Wolfgang type: window_setback@1.0 - targets: - rooms: - - Arbeitszimmer Wolfgang + objects: contacts: - kontakt_wolfgang_garten thermostats: @@ -104,4 +92,3 @@ rules: eco_target: 16.0 open_min_secs: 20 close_min_secs: 20 - previous_target_ttl_secs: 86400