26 Commits

Author SHA1 Message Date
6152385339 fix 8 2025-11-14 15:14:48 +01:00
c2b7328219 fix 7 2025-11-14 15:13:37 +01:00
99362b346f fix 6 2025-11-14 15:01:49 +01:00
77d29c3a42 fix 5 2025-11-14 14:31:03 +01:00
ef3b1177d2 fix 4 2025-11-14 14:18:59 +01:00
8bbe9c164f fix 3 2025-11-14 14:14:49 +01:00
65f8a0c7cb fix 2 2025-11-14 11:34:32 +01:00
cbe7e11cf2 fix 2025-11-14 11:30:10 +01:00
9bf336fa11 groups and scenes 3 2025-11-13 21:56:13 +01:00
b82217a666 groups and scenes 2 2025-11-13 21:54:09 +01:00
5851414ba5 groups and scenes initial 2025-11-13 21:29:04 +01:00
4c5475e930 favicon 2025-11-13 11:14:43 +01:00
b6b441c0ca rules 2 2025-11-11 19:58:06 +01:00
d3d96ed3e9 enabled for rules 2025-11-11 17:08:18 +01:00
2e2963488b rules initial 2025-11-11 16:38:41 +01:00
7928bc596f compose file 2025-11-11 12:40:53 +01:00
3874eaed83 compose file added 2025-11-11 12:34:49 +01:00
0f43f37823 shellies 2025-11-11 11:39:10 +01:00
93e70da97d add spuele 3 2025-11-11 11:11:14 +01:00
62d302bf41 add spuele 2 2025-11-11 11:10:31 +01:00
3d6130f2c2 add spuele 2025-11-11 11:09:08 +01:00
2a8d569bb5 shelly 2025-11-11 11:01:52 +01:00
6a5f814cb4 fix in layout, drop test entry 2025-11-11 10:28:27 +01:00
cc3c15078c change relays to type relay 2025-11-11 10:24:09 +01:00
7772dac000 medusa lampe to relay 2025-11-11 10:12:25 +01:00
97ea853483 add type relay 2025-11-11 10:10:22 +01:00
28 changed files with 4099 additions and 137 deletions

View File

@@ -15,7 +15,7 @@ import uuid
from aiomqtt import Client
from pydantic import ValidationError
from packages.home_capabilities import LightState, ThermostatState, ContactState, TempHumidityState
from packages.home_capabilities import LightState, ThermostatState, ContactState, TempHumidityState, RelayState
from apps.abstraction.transformation import (
transform_abstract_to_vendor,
transform_vendor_to_abstract
@@ -154,6 +154,9 @@ async def handle_abstract_set(
if device_type == "light":
# Validate light SET payload (power and/or brightness)
LightState.model_validate(abstract_payload)
elif device_type == "relay":
# Validate relay SET payload (power only)
RelayState.model_validate(abstract_payload)
elif device_type == "thermostat":
# For thermostat SET: only allow mode and target fields
allowed_set_fields = {"mode", "target"}
@@ -178,9 +181,10 @@ async def handle_abstract_set(
# Transform abstract payload to vendor-specific format
vendor_payload = transform_abstract_to_vendor(device_type, device_technology, abstract_payload)
# For MAX! thermostats, vendor_payload is a plain string (integer temperature)
# For MAX! thermostats and Shelly relays, vendor_payload is a plain string
# For other devices, it's a dict that needs JSON encoding
if device_technology == "max" and device_type == "thermostat":
if (device_technology == "max" and device_type == "thermostat") or \
(device_technology == "shelly" and device_type == "relay"):
vendor_message = vendor_payload # Already a string
else:
vendor_message = json.dumps(vendor_payload)
@@ -216,6 +220,8 @@ async def handle_vendor_state(
try:
if device_type == "light":
LightState.model_validate(abstract_payload)
elif device_type == "relay":
RelayState.model_validate(abstract_payload)
elif device_type == "thermostat":
# Validate thermostat state: mode, target, current (required), battery, window_open
ThermostatState.model_validate(abstract_payload)
@@ -334,9 +340,21 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
max_device_type = device["type"]
break
# Check for Shelly relay (also sends plain text)
is_shelly_relay = False
shelly_device_id = None
shelly_device_type = None
for device_id, device in devices.items():
if device.get("technology") == "shelly" and device.get("type") == "relay":
if topic == device["topics"]["state"]:
is_shelly_relay = True
shelly_device_id = device_id
shelly_device_type = device["type"]
break
# Parse payload based on device technology
if is_max_device:
# MAX! sends plain integer/string, not JSON
if is_max_device or is_shelly_relay:
# MAX! and Shelly send plain text, not JSON
payload = payload_str.strip()
else:
# All other technologies use JSON
@@ -372,6 +390,14 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
client, redis_client, max_device_id, max_device_type,
device_technology, payload, redis_channel
)
# For Shelly relay devices, we already identified them above
elif is_shelly_relay:
device = devices[shelly_device_id]
device_technology = device.get("technology", "unknown")
await handle_vendor_state(
client, redis_client, shelly_device_id, shelly_device_type,
device_technology, payload, redis_channel
)
else:
# Find device by vendor state topic for other technologies
for device_id, device in devices.items():

View File

@@ -307,6 +307,76 @@ def _transform_temp_humidity_sensor_max_to_abstract(payload: dict[str, Any]) ->
return payload
# ============================================================================
# HANDLER FUNCTIONS: relay - zigbee2mqtt technology
# ============================================================================
def _transform_relay_zigbee2mqtt_to_vendor(payload: dict[str, Any]) -> dict[str, Any]:
"""Transform abstract relay payload to zigbee2mqtt format.
Relay only has power on/off, same transformation as light.
- power: 'on'/'off' -> state: 'ON'/'OFF'
"""
vendor_payload = payload.copy()
if "power" in vendor_payload:
power_value = vendor_payload.pop("power")
vendor_payload["state"] = power_value.upper() if isinstance(power_value, str) else power_value
return vendor_payload
def _transform_relay_zigbee2mqtt_to_abstract(payload: dict[str, Any]) -> dict[str, Any]:
"""Transform zigbee2mqtt relay payload to abstract format.
Relay only has power on/off, same transformation as light.
- state: 'ON'/'OFF' -> power: 'on'/'off'
"""
abstract_payload = payload.copy()
if "state" in abstract_payload:
state_value = abstract_payload.pop("state")
abstract_payload["power"] = state_value.lower() if isinstance(state_value, str) else state_value
return abstract_payload
# ============================================================================
# HANDLER FUNCTIONS: relay - shelly technology
# ============================================================================
def _transform_relay_shelly_to_vendor(payload: dict[str, Any]) -> str:
"""Transform abstract relay payload to Shelly format.
Shelly expects plain text 'on' or 'off' (not JSON).
- power: 'on'/'off' -> 'on'/'off' (plain string)
Example:
- Abstract: {'power': 'on'}
- Shelly: 'on'
"""
power = payload.get("power", "off")
return power
def _transform_relay_shelly_to_abstract(payload: str) -> dict[str, Any]:
"""Transform Shelly relay payload to abstract format.
Shelly sends plain text 'on' or 'off' (not JSON).
- 'on'/'off' -> power: 'on'/'off'
Example:
- Shelly: 'on'
- Abstract: {'power': 'on'}
"""
# Shelly payload is a plain string, not a dict
if isinstance(payload, str):
return {"power": payload.strip()}
# Fallback if it's already a dict (shouldn't happen)
return payload
# ============================================================================
# HANDLER FUNCTIONS: max technology (Homegear MAX!)
# ============================================================================
@@ -420,6 +490,12 @@ TRANSFORM_HANDLERS: dict[tuple[str, str, str], TransformHandler] = {
("temp_humidity", "zigbee2mqtt", "to_abstract"): _transform_temp_humidity_sensor_zigbee2mqtt_to_abstract,
("temp_humidity", "max", "to_vendor"): _transform_temp_humidity_sensor_max_to_vendor,
("temp_humidity", "max", "to_abstract"): _transform_temp_humidity_sensor_max_to_abstract,
# Relay transformations
("relay", "zigbee2mqtt", "to_vendor"): _transform_relay_zigbee2mqtt_to_vendor,
("relay", "zigbee2mqtt", "to_abstract"): _transform_relay_zigbee2mqtt_to_abstract,
("relay", "shelly", "to_vendor"): _transform_relay_shelly_to_vendor,
("relay", "shelly", "to_abstract"): _transform_relay_shelly_to_abstract,
}

View File

@@ -20,14 +20,32 @@ from packages.home_capabilities import (
THERMOSTAT_VERSION,
CONTACT_SENSOR_VERSION,
TEMP_HUMIDITY_SENSOR_VERSION,
RELAY_VERSION,
LightState,
ThermostatState,
ContactState,
TempHumidityState
TempHumidityState,
RelayState,
load_layout,
)
# Import resolvers (must be before router imports to avoid circular dependency)
from apps.api.resolvers import (
DeviceDTO,
resolve_group_devices,
resolve_scene_step_devices,
load_device_rooms,
get_room,
clear_room_cache,
)
logger = logging.getLogger(__name__)
# ============================================================================
# STATE CACHES
# ============================================================================
# In-memory cache for last known device states
# Will be populated from Redis pub/sub messages
device_states: dict[str, dict[str, Any]] = {}
@@ -55,6 +73,13 @@ app.add_middleware(
)
@app.on_event("startup")
async def startup_event():
"""Include routers after app is initialized to avoid circular imports."""
from apps.api.routes.groups_scenes import router as groups_scenes_router
app.include_router(groups_scenes_router, prefix="")
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint.
@@ -148,7 +173,8 @@ async def spec() -> dict[str, dict[str, str]]:
"light": LIGHT_VERSION,
"thermostat": THERMOSTAT_VERSION,
"contact": CONTACT_SENSOR_VERSION,
"temp_humidity": TEMP_HUMIDITY_SENSOR_VERSION
"temp_humidity": TEMP_HUMIDITY_SENSOR_VERSION,
"relay": RELAY_VERSION
}
}
@@ -204,6 +230,50 @@ def get_mqtt_settings() -> tuple[str, int]:
return host, port
# ============================================================================
# MQTT PUBLISH
# ============================================================================
async def publish_abstract_set(device_type: str, device_id: str, payload: dict[str, Any]) -> None:
"""
Publish an abstract set command via MQTT.
This function encapsulates MQTT publishing logic so that group/scene
execution doesn't need to know MQTT topic details.
Topic format: home/{device_type}/{device_id}/set
Message format: {"type": device_type, "payload": payload}
Args:
device_type: Device type (light, thermostat, relay, etc.)
device_id: Device identifier
payload: Command payload (e.g., {"power": "on", "brightness": 50})
Example:
>>> await publish_abstract_set("light", "kueche_deckenlampe", {"power": "on", "brightness": 35})
# Publishes to: home/light/kueche_deckenlampe/set
# Message: {"type": "light", "payload": {"power": "on", "brightness": 35}}
"""
mqtt_host, mqtt_port = get_mqtt_settings()
topic = f"home/{device_type}/{device_id}/set"
message = {
"type": device_type,
"payload": payload
}
try:
async with Client(hostname=mqtt_host, port=mqtt_port) as client:
await client.publish(
topic=topic,
payload=json.dumps(message),
qos=1
)
logger.info(f"Published to {topic}: {message}")
except Exception as e:
logger.error(f"Failed to publish to {topic}: {e}")
raise
def get_redis_settings() -> tuple[str, str]:
"""Get Redis settings from configuration.
@@ -288,8 +358,6 @@ async def get_layout() -> dict[str, Any]:
Returns:
dict: Layout configuration with rooms and device tiles
"""
from packages.home_capabilities import load_layout
try:
layout = load_layout()
@@ -318,6 +386,23 @@ async def get_layout() -> dict[str, Any]:
return {"rooms": []}
@app.get("/devices/{device_id}/room")
async def get_device_room(device_id: str) -> dict[str, str | None]:
"""Get the room name for a specific device.
Args:
device_id: Device identifier
Returns:
dict: {"device_id": str, "room": str | null}
"""
room = get_room(device_id)
return {
"device_id": device_id,
"room": room
}
@app.post("/devices/{device_id}/set", status_code=status.HTTP_202_ACCEPTED)
async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str]:
"""Set device state.
@@ -358,6 +443,14 @@ async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid payload for light: {e}"
)
elif request.type == "relay":
try:
RelayState(**request.payload)
except ValidationError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid payload for relay: {e}"
)
elif request.type == "thermostat":
try:
# For thermostat SET: only allow mode and target

286
apps/api/resolvers.py Normal file
View File

@@ -0,0 +1,286 @@
"""Group and scene resolution logic."""
import logging
from pathlib import Path
from typing import Any, TypedDict
from packages.home_capabilities import (
GroupConfig,
GroupsConfigRoot,
SceneStep,
get_group_by_id,
load_layout,
)
logger = logging.getLogger(__name__)
# ============================================================================
# TYPE DEFINITIONS
# ============================================================================
class DeviceDTO(TypedDict, total=False):
"""Device Data Transfer Object.
Represents a device as returned by /devices endpoint or load_devices().
Required fields:
device_id: Unique device identifier
type: Device type (light, thermostat, relay, etc.)
Optional fields:
name: Human-readable device name
features: Device capabilities (power, brightness, etc.)
technology: MQTT, zigbee2mqtt, simulator, etc.
topics: MQTT topic configuration
metadata: Additional device information
"""
device_id: str
type: str
name: str
features: dict[str, Any]
technology: str
topics: dict[str, str]
metadata: dict[str, Any]
# ============================================================================
# DEVICE-ROOM MAPPING
# ============================================================================
# Global cache for device -> room mapping
_device_room_cache: dict[str, str] = {}
def load_device_rooms(path: str | Path | None = None) -> dict[str, str]:
"""
Load device-to-room mapping from layout configuration.
This function extracts a mapping of device_id -> room_name from the layout.yaml
file, which is useful for resolving selectors like {room: "Küche"}.
Args:
path: Optional path to layout.yaml. If None, uses default path
(config/layout.yaml relative to workspace root)
Returns:
Dictionary mapping device_id to room_name. Returns empty dict if:
- layout.yaml doesn't exist
- layout.yaml is malformed
- layout.yaml is empty
Example:
>>> mapping = load_device_rooms()
>>> mapping['kueche_lampe1']
'Küche'
"""
global _device_room_cache
try:
# Load the layout using existing function
layout = load_layout(path)
# Build device -> room mapping
device_rooms: dict[str, str] = {}
for room in layout.rooms:
for device in room.devices:
device_rooms[device.device_id] = room.name
# Update global cache
_device_room_cache = device_rooms.copy()
logger.info(f"Loaded device-room mapping: {len(device_rooms)} devices")
return device_rooms
except (FileNotFoundError, ValueError, Exception) as e:
logger.warning(f"Failed to load device-room mapping: {e}")
logger.warning("Returning empty device-room mapping")
_device_room_cache = {}
return {}
def get_room(device_id: str) -> str | None:
"""
Get the room name for a given device ID.
This function uses the cached device-room mapping loaded by load_device_rooms().
If the cache is empty, it will attempt to load it first.
Args:
device_id: The device identifier to lookup
Returns:
Room name if device is found, None otherwise
Example:
>>> get_room('kueche_lampe1')
'Küche'
>>> get_room('nonexistent_device')
None
"""
# Check if cache is populated
if not _device_room_cache:
logger.debug("Device-room cache empty, loading from layout...")
# Load mapping (this updates the global _device_room_cache)
load_device_rooms()
# Access the cache after potential reload
return _device_room_cache.get(device_id)
def clear_room_cache() -> None:
"""
Clear the cached device-room mapping.
This is useful for testing or when the layout configuration has changed
and needs to be reloaded.
"""
_device_room_cache.clear()
logger.debug("Cleared device-room cache")
# ============================================================================
# GROUP & SCENE RESOLUTION
# ============================================================================
def resolve_group_devices(
group: GroupConfig,
devices: list[DeviceDTO],
device_rooms: dict[str, str]
) -> list[DeviceDTO]:
"""
Resolve devices for a group based on device_ids or selector.
Args:
group: Group configuration with device_ids or selector
devices: List of all available devices
device_rooms: Mapping of device_id -> room_name
Returns:
List of devices matching the group criteria (no duplicates)
Example:
>>> # Group with explicit device_ids
>>> group = GroupConfig(id="test", name="Test", device_ids=["lamp1", "lamp2"])
>>> resolve_group_devices(group, all_devices, {})
[{"device_id": "lamp1", ...}, {"device_id": "lamp2", ...}]
>>> # Group with selector (all lights in kitchen)
>>> group = GroupConfig(
... id="kitchen_lights",
... name="Kitchen Lights",
... selector=GroupSelector(type="light", room="Küche")
... )
>>> resolve_group_devices(group, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
"""
# Case 1: Explicit device_ids
if group.device_ids:
device_id_set = set(group.device_ids)
return [d for d in devices if d["device_id"] in device_id_set]
# Case 2: Selector-based filtering
if group.selector:
filtered = []
for device in devices:
# Filter by type (required in selector)
if device["type"] != group.selector.type:
continue
# Filter by room (optional)
if group.selector.room:
device_room = device_rooms.get(device["device_id"])
if device_room != group.selector.room:
continue
# Filter by tags (optional, future feature)
# if group.selector.tags:
# device_tags = device.get("metadata", {}).get("tags", [])
# if not any(tag in device_tags for tag in group.selector.tags):
# continue
filtered.append(device)
return filtered
# No device_ids and no selector → empty list
return []
def resolve_scene_step_devices(
step: SceneStep,
groups_config: GroupsConfigRoot,
devices: list[DeviceDTO],
device_rooms: dict[str, str]
) -> list[DeviceDTO]:
"""
Resolve devices for a scene step based on group_id or selector.
Args:
step: Scene step with group_id or selector
groups_config: Groups configuration for group lookup
devices: List of all available devices
device_rooms: Mapping of device_id -> room_name
Returns:
List of devices matching the step criteria
Raises:
ValueError: If group_id is specified but group not found
Example:
>>> # Step with group_id
>>> step = SceneStep(group_id="kitchen_lights", action={...})
>>> resolve_scene_step_devices(step, groups_cfg, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
>>> # Step with selector
>>> step = SceneStep(
... selector=SceneSelector(type="light", room="Küche"),
... action={...}
... )
>>> resolve_scene_step_devices(step, groups_cfg, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
"""
# Case 1: Group reference
if step.group_id:
# Look up the group
group = get_group_by_id(groups_config, step.group_id)
if not group:
raise ValueError(
f"Scene step references unknown group_id: '{step.group_id}'. "
f"Available groups: {[g.id for g in groups_config.groups]}"
)
# Resolve the group's devices
return resolve_group_devices(group, devices, device_rooms)
# Case 2: Direct selector
if step.selector:
filtered = []
for device in devices:
# Filter by type (optional in scene selector)
if step.selector.type and device["type"] != step.selector.type:
continue
# Filter by room (optional)
if step.selector.room:
device_room = device_rooms.get(device["device_id"])
if device_room != step.selector.room:
continue
# Filter by tags (optional, future feature)
# if step.selector.tags:
# device_tags = device.get("metadata", {}).get("tags", [])
# if not any(tag in device_tags for tag in step.selector.tags):
# continue
filtered.append(device)
return filtered
# Should not reach here due to SceneStep validation (must have group_id or selector)
return []

View File

@@ -0,0 +1 @@
"""API routes package."""

View File

@@ -0,0 +1,454 @@
"""Groups and Scenes API routes."""
import asyncio
import logging
from pathlib import Path
from typing import Any
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from packages.home_capabilities import (
GroupConfig,
GroupsConfigRoot,
SceneConfig,
ScenesConfigRoot,
get_group_by_id,
get_scene_by_id,
load_groups,
load_scenes,
)
# Import from parent modules
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from resolvers import (
DeviceDTO,
resolve_group_devices,
resolve_scene_step_devices,
load_device_rooms,
)
from main import load_devices, publish_abstract_set
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================
class GroupResponse(BaseModel):
"""Response model for a group."""
id: str
name: str
device_count: int
devices: list[str]
selector: dict[str, Any] | None = None
capabilities: dict[str, bool]
class GroupSetRequest(BaseModel):
"""Request to set state for all devices in a group."""
action: dict[str, Any] # e.g., {"type": "light", "payload": {"power": "on", "brightness": 50}}
class SceneResponse(BaseModel):
"""Response model for a scene."""
id: str
name: str
steps: int
class SceneRunRequest(BaseModel):
"""Request to execute a scene (currently empty, future: override params)."""
pass
class SceneExecutionResponse(BaseModel):
"""Response after scene execution."""
scene_id: str
scene_name: str
steps_executed: int
devices_affected: int
execution_plan: list[dict[str, Any]]
# ============================================================================
# GROUPS ENDPOINTS
# ============================================================================
@router.get("/groups", response_model=list[GroupResponse], tags=["groups"])
async def list_groups() -> list[GroupResponse]:
"""
List all available groups.
Returns:
list[GroupResponse]: List of groups with their devices
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Build response for each group
response = []
for group in groups_config.groups:
# Resolve devices for this group
resolved_devices = resolve_group_devices(group, devices, device_rooms)
device_ids = [d["device_id"] for d in resolved_devices]
# Convert selector to dict if present
selector_dict = None
if group.selector:
selector_dict = {
"type": group.selector.type,
"room": group.selector.room,
"tags": group.selector.tags,
}
response.append(GroupResponse(
id=group.id,
name=group.name,
device_count=len(device_ids),
devices=device_ids,
selector=selector_dict,
capabilities=group.capabilities,
))
return response
except Exception as e:
logger.error(f"Error loading groups: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to load groups: {str(e)}"
)
@router.get("/groups/{group_id}", response_model=GroupResponse, tags=["groups"])
async def get_group(group_id: str) -> GroupResponse:
"""
Get details for a specific group.
Args:
group_id: Group identifier
Returns:
GroupResponse: Group details with resolved devices
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the group
group = get_group_by_id(groups_config, group_id)
if not group:
available_groups = [g.id for g in groups_config.groups]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Group '{group_id}' not found. Available groups: {available_groups}"
)
# Resolve devices
resolved_devices = resolve_group_devices(group, devices, device_rooms)
device_ids = [d["device_id"] for d in resolved_devices]
# Convert selector to dict if present
selector_dict = None
if group.selector:
selector_dict = {
"type": group.selector.type,
"room": group.selector.room,
"tags": group.selector.tags,
}
return GroupResponse(
id=group.id,
name=group.name,
device_count=len(device_ids),
devices=device_ids,
selector=selector_dict,
capabilities=group.capabilities,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting group {group_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get group: {str(e)}"
)
@router.post("/groups/{group_id}/set", status_code=status.HTTP_202_ACCEPTED, tags=["groups"])
async def set_group(group_id: str, request: GroupSetRequest) -> dict[str, Any]:
"""
Set state for all devices in a group.
This endpoint resolves the group to its devices and would send
the action to each device. Currently returns execution plan.
Args:
group_id: Group identifier
request: Action to apply to all devices in the group
Returns:
dict: Execution plan (devices and actions to be executed)
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the group
group = get_group_by_id(groups_config, group_id)
if not group:
available_groups = [g.id for g in groups_config.groups]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Group '{group_id}' not found. Available groups: {available_groups}"
)
# Resolve devices
resolved_devices = resolve_group_devices(group, devices, device_rooms)
if not resolved_devices:
logger.warning(f"Group '{group_id}' resolved to 0 devices")
# Execute actions via MQTT
execution_plan = []
for device in resolved_devices:
device_type = device["type"]
device_id = device["device_id"]
payload = request.action.get("payload", {})
# Publish MQTT command
try:
await publish_abstract_set(device_type, device_id, payload)
execution_plan.append({
"device_id": device_id,
"device_type": device_type,
"action": request.action,
"status": "published"
})
except Exception as e:
logger.error(f"Failed to publish to {device_id}: {e}")
execution_plan.append({
"device_id": device_id,
"device_type": device_type,
"action": request.action,
"status": "failed",
"error": str(e)
})
return {
"group_id": group_id,
"group_name": group.name,
"devices_affected": len(resolved_devices),
"execution_plan": execution_plan,
"status": "executed"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error setting group {group_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to set group: {str(e)}"
)
# ============================================================================
# SCENES ENDPOINTS
# ============================================================================
@router.get("/scenes", response_model=list[SceneResponse], tags=["scenes"])
async def list_scenes() -> list[SceneResponse]:
"""
List all available scenes.
Returns:
list[SceneResponse]: List of scenes
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
# Build response for each scene
response = []
for scene in scenes_config.scenes:
response.append(SceneResponse(
id=scene.id,
name=scene.name,
steps=len(scene.steps),
))
return response
except Exception as e:
logger.error(f"Error loading scenes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to load scenes: {str(e)}"
)
@router.get("/scenes/{scene_id}", response_model=SceneResponse, tags=["scenes"])
async def get_scene(scene_id: str) -> SceneResponse:
"""
Get details for a specific scene.
Args:
scene_id: Scene identifier
Returns:
SceneResponse: Scene details
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
# Find the scene
scene = get_scene_by_id(scenes_config, scene_id)
if not scene:
available_scenes = [s.id for s in scenes_config.scenes]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scene '{scene_id}' not found. Available scenes: {available_scenes}"
)
return SceneResponse(
id=scene.id,
name=scene.name,
steps=len(scene.steps),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting scene {scene_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get scene: {str(e)}"
)
@router.post("/scenes/{scene_id}/run", response_model=SceneExecutionResponse, tags=["scenes"])
async def run_scene(scene_id: str, request: SceneRunRequest | None = None) -> SceneExecutionResponse:
"""
Execute a scene.
This endpoint resolves each step in the scene to its target devices
and would execute the actions. Currently returns execution plan.
Args:
scene_id: Scene identifier
request: Optional execution parameters (reserved for future use)
Returns:
SceneExecutionResponse: Execution plan and summary
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the scene
scene = get_scene_by_id(scenes_config, scene_id)
if not scene:
available_scenes = [s.id for s in scenes_config.scenes]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scene '{scene_id}' not found. Available scenes: {available_scenes}"
)
# Execute scene steps
execution_plan = []
total_devices = 0
for i, step in enumerate(scene.steps, 1):
# Resolve devices for this step
resolved_devices = resolve_scene_step_devices(step, groups_config, devices, device_rooms)
total_devices += len(resolved_devices)
# Extract action payload
action_payload = step.action.get("payload", {})
# Execute for each device
step_executions = []
for device in resolved_devices:
device_type = device["type"]
device_id = device["device_id"]
try:
await publish_abstract_set(device_type, device_id, action_payload)
step_executions.append({
"device_id": device_id,
"status": "published"
})
except Exception as e:
logger.error(f"Failed to publish to {device_id} in step {i}: {e}")
step_executions.append({
"device_id": device_id,
"status": "failed",
"error": str(e)
})
# Build step info
step_info = {
"step": i,
"devices_affected": len(resolved_devices),
"device_ids": [d["device_id"] for d in resolved_devices],
"action": step.action,
"executions": step_executions,
}
# Add targeting info
if step.group_id:
step_info["target"] = {"type": "group_id", "value": step.group_id}
elif step.selector:
step_info["target"] = {
"type": "selector",
"selector_type": step.selector.type,
"room": step.selector.room,
}
if step.delay_ms:
step_info["delay_ms"] = step.delay_ms
# Apply delay before next step
await asyncio.sleep(step.delay_ms / 1000.0)
execution_plan.append(step_info)
return SceneExecutionResponse(
scene_id=scene.id,
scene_name=scene.name,
steps_executed=len(scene.steps),
devices_affected=total_devices,
execution_plan=execution_plan,
)
except HTTPException:
raise
except ValueError as e:
# Handle unknown group_id in scene step
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Error running scene {scene_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to run scene: {str(e)}"
)

53
apps/rules/Dockerfile Normal file
View File

@@ -0,0 +1,53 @@
# Rules Engine Dockerfile
# Event-driven automation rules processor with MQTT and Redis
FROM python:3.14-alpine
# Prevent Python from writing .pyc files and enable unbuffered output
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
RULES_CONFIG=config/rules.yaml \
MQTT_BROKER=172.16.2.16 \
MQTT_PORT=1883 \
REDIS_HOST=localhost \
REDIS_PORT=6379 \
REDIS_DB=8 \
LOG_LEVEL=INFO
# Create non-root user
RUN addgroup -g 10001 -S app && \
adduser -u 10001 -S app -G app
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apk add --no-cache \
gcc \
musl-dev \
linux-headers
# Install Python dependencies
COPY apps/rules/requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY apps/__init__.py /app/apps/
COPY apps/rules/ /app/apps/rules/
COPY packages/ /app/packages/
COPY config/ /app/config/
# Change ownership to non-root user
RUN chown -R app:app /app
# Switch to non-root user
USER app
# Expose no ports (MQTT/Redis client only)
# Health check (check if process is running)
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD pgrep -f "apps.rules.main" || exit 1
# Run the rules engine
CMD ["python", "-m", "apps.rules.main"]

View File

@@ -0,0 +1,371 @@
# Rule Interface Documentation
## Overview
The rule interface provides a clean abstraction for implementing automation rules. Rules respond to device state changes and can publish commands, persist state, and log diagnostics.
## Core Components
### 1. RuleDescriptor
Configuration data for a rule instance (loaded from `rules.yaml`):
```python
RuleDescriptor(
id="window_setback_wohnzimmer", # Unique rule ID
name="Fensterabsenkung Wohnzimmer", # Optional display name
type="window_setback@1.0", # Rule type + version
targets={ # Rule-specific targets
"rooms": ["Wohnzimmer"],
"contacts": ["kontakt_wohnzimmer_..."],
"thermostats": ["thermostat_wohnzimmer"]
},
params={ # Rule-specific parameters
"eco_target": 16.0,
"open_min_secs": 20
}
)
```
### 2. RedisState
Async state persistence with automatic reconnection and retry logic:
```python
# Initialize (done by rule engine)
redis_state = RedisState("redis://172.23.1.116:6379/8")
# Simple key-value with TTL
await ctx.redis.set("rules:my_rule:temp", "22.5", ttl_secs=3600)
value = await ctx.redis.get("rules:my_rule:temp") # Returns "22.5" or None
# Hash storage (for multiple related values)
await ctx.redis.hset("rules:my_rule:sensors", "bedroom", "open")
await ctx.redis.hset("rules:my_rule:sensors", "kitchen", "closed")
value = await ctx.redis.hget("rules:my_rule:sensors", "bedroom") # "open"
# TTL management
await ctx.redis.expire("rules:my_rule:temp", 7200) # Extend to 2 hours
# JSON helpers (for complex data)
import json
data = {"temp": 22.5, "humidity": 45}
await ctx.redis.set("rules:my_rule:data", ctx.redis._dumps(data))
stored = await ctx.redis.get("rules:my_rule:data")
parsed = ctx.redis._loads(stored) if stored else None
```
**Key Conventions:**
- Use prefix `rules:{rule_id}:` for all keys
- Example: `rules:window_setback_wohnzimmer:thermo:device_123:previous`
- TTL recommended for temporary state (previous temperatures, timers)
**Robustness Features:**
- Automatic retry with exponential backoff (default: 3 retries)
- Connection pooling (max 10 connections)
- Automatic reconnection on Redis restart
- Health checks every 30 seconds
- All operations wait and retry, no exceptions on temporary outages
### 3. MQTTClient
Async MQTT client with event normalization and command publishing:
```python
# Initialize (done by rule engine)
mqtt_client = MQTTClient(
broker="172.16.2.16",
port=1883,
client_id="rule_engine"
)
# Subscribe and receive normalized events
async for event in mqtt_client.connect():
# Event structure:
# {
# "topic": "home/contact/sensor_1/state",
# "type": "state",
# "cap": "contact", # Capability (contact, thermostat, etc.)
# "device_id": "sensor_1",
# "payload": {"contact": "open"},
# "ts": "2025-11-11T10:30:45.123456"
# }
if event['cap'] == 'contact':
handle_contact(event)
elif event['cap'] == 'thermostat':
handle_thermostat(event)
# Publish commands (within async context)
await mqtt_client.publish_set_thermostat("thermostat_id", 22.5)
```
**Subscriptions:**
- `home/contact/+/state` - All contact sensor state changes
- `home/thermostat/+/state` - All thermostat state changes
**Publishing:**
- Topic: `home/thermostat/{device_id}/set`
- Payload: `{"type":"thermostat","payload":{"target":22.5}}`
- QoS: 1 (at least once delivery)
**Robustness:**
- Automatic reconnection with exponential backoff
- Connection logging (connect/disconnect events)
- Clean session handling
### 4. MQTTPublisher (Legacy)
Simplified wrapper around MQTTClient for backward compatibility:
```python
# Set thermostat temperature
await ctx.mqtt.publish_set_thermostat("thermostat_wohnzimmer", 21.5)
```
### 5. RuleContext
Runtime context provided to rules:
```python
class RuleContext:
logger # Logger instance
mqtt # MQTTPublisher
redis # RedisState
now() -> datetime # Current timestamp
```
### 5. Rule Abstract Base Class
All rules extend this:
```python
class MyRule(Rule):
async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None:
# Event structure:
# {
# "topic": "home/contact/device_id/state",
# "type": "state",
# "cap": "contact",
# "device_id": "kontakt_wohnzimmer",
# "payload": {"contact": "open"},
# "ts": "2025-11-11T10:30:45.123456"
# }
device_id = evt['device_id']
cap = evt['cap']
if cap == 'contact':
contact_state = evt['payload'].get('contact')
# ... implement logic
```
## Implementing a New Rule
### Step 1: Create Rule Class
```python
from packages.rule_interface import Rule, RuleDescriptor, RuleContext
from typing import Any
class MyCustomRule(Rule):
"""My custom automation rule."""
async def on_event(
self,
evt: dict[str, Any],
desc: RuleDescriptor,
ctx: RuleContext
) -> None:
"""Process device state changes."""
# 1. Extract event data
device_id = evt['device_id']
cap = evt['cap']
payload = evt['payload']
# 2. Filter to relevant devices
if device_id not in desc.targets.get('my_devices', []):
return
# 3. Implement logic
if cap == 'contact':
if payload.get('contact') == 'open':
# Do something
await ctx.mqtt.publish_set_thermostat(
'some_thermostat',
desc.params.get('temp', 20.0)
)
# 4. Persist state if needed
state_key = f"rule:{desc.id}:device:{device_id}:state"
await ctx.redis.set(state_key, payload.get('contact'))
```
### Step 2: Register in RULE_IMPLEMENTATIONS
```python
# In your rule module (e.g., my_custom_rule.py)
RULE_IMPLEMENTATIONS = {
'my_custom@1.0': MyCustomRule,
}
```
### Step 3: Configure in rules.yaml
```yaml
rules:
- id: my_custom_living_room
name: My Custom Rule for Living Room
type: my_custom@1.0
targets:
my_devices:
- device_1
- device_2
params:
temp: 22.0
duration_secs: 300
```
## Best Practices
### Idempotency
Rules MUST be idempotent - processing the same event multiple times should be safe:
```python
# Good: Idempotent
async def on_event(self, evt, desc, ctx):
if evt['payload'].get('contact') == 'open':
await ctx.mqtt.publish_set_thermostat('thermo', 16.0)
# Bad: Not idempotent (increments counter)
async def on_event(self, evt, desc, ctx):
counter = await ctx.redis.get('counter') or '0'
await ctx.redis.set('counter', str(int(counter) + 1))
```
### Error Handling
Handle errors gracefully - the engine will catch and log exceptions:
```python
async def on_event(self, evt, desc, ctx):
try:
await ctx.mqtt.publish_set_thermostat('thermo', 16.0)
except Exception as e:
ctx.logger.error(f"Failed to set thermostat: {e}")
# Don't raise - let event processing continue
```
### State Keys
Use consistent naming for Redis keys:
```python
# Pattern: rule:{rule_id}:{category}:{device_id}:{field}
state_key = f"rule:{desc.id}:contact:{device_id}:state"
ts_key = f"rule:{desc.id}:contact:{device_id}:ts"
prev_key = f"rule:{desc.id}:thermo:{device_id}:previous"
```
### Logging
Use appropriate log levels:
```python
ctx.logger.debug("Detailed diagnostic info")
ctx.logger.info("Normal operation milestones")
ctx.logger.warning("Unexpected but handled situations")
ctx.logger.error("Errors that prevent operation")
```
## Event Structure Reference
### Contact Sensor Event
```python
{
"topic": "home/contact/kontakt_wohnzimmer/state",
"type": "state",
"cap": "contact",
"device_id": "kontakt_wohnzimmer",
"payload": {
"contact": "open" # or "closed"
},
"ts": "2025-11-11T10:30:45.123456"
}
```
### Thermostat Event
```python
{
"topic": "home/thermostat/thermostat_wohnzimmer/state",
"type": "state",
"cap": "thermostat",
"device_id": "thermostat_wohnzimmer",
"payload": {
"target": 21.0,
"current": 20.5,
"mode": "heat"
},
"ts": "2025-11-11T10:30:45.123456"
}
```
## Testing Rules
Rules can be tested independently of the engine:
```python
import pytest
from unittest.mock import AsyncMock, MagicMock
from packages.my_custom_rule import MyCustomRule
from packages.rule_interface import RuleDescriptor, RuleContext
@pytest.mark.asyncio
async def test_my_rule():
# Setup
rule = MyCustomRule()
desc = RuleDescriptor(
id="test_rule",
type="my_custom@1.0",
targets={"my_devices": ["device_1"]},
params={"temp": 22.0}
)
# Mock context
ctx = RuleContext(
logger=MagicMock(),
mqtt_publisher=AsyncMock(),
redis_state=AsyncMock(),
now_fn=lambda: datetime.now()
)
# Test event
evt = {
"device_id": "device_1",
"cap": "contact",
"payload": {"contact": "open"},
"ts": "2025-11-11T10:30:45.123456"
}
# Execute
await rule.on_event(evt, desc, ctx)
# Assert
ctx.mqtt.publish_set_thermostat.assert_called_once_with('some_thermostat', 22.0)
```
## Extension Points
The interface is designed to be extended without modifying the engine:
1. **New rule types**: Just implement `Rule` and register in `RULE_IMPLEMENTATIONS`
2. **New MQTT commands**: Extend `MQTTPublisher` with new methods
3. **New state backends**: Implement `RedisState` interface with different storage
4. **Custom context**: Extend `RuleContext` with additional utilities
The engine only depends on the abstract interfaces, not specific implementations.

View File

@@ -0,0 +1,15 @@
"""
Rule Implementations Package
This package contains all rule implementation modules.
Naming Convention:
- Module name: snake_case matching the rule type name
Example: window_setback.py for type 'window_setback@1.0'
- Class name: PascalCase + 'Rule' suffix
Example: WindowSetbackRule
The rule engine uses load_rule() from rule_interface to dynamically
import modules from this package based on the 'type' field in rules.yaml.
"""

View File

@@ -0,0 +1,256 @@
"""
Example Rule Implementation: Window Setback
Demonstrates how to implement a Rule using the rule_interface.
This rule lowers thermostat temperature when a window is opened.
"""
from typing import Any
from pydantic import BaseModel, Field, ValidationError
from apps.rules.rule_interface import Rule, RuleDescriptor, RuleContext
class WindowSetbackObjects(BaseModel):
"""Object structure for window setback rule"""
contacts: list[str] = Field(..., min_length=1, description="Contact sensors to monitor")
thermostats: list[str] = Field(..., min_length=1, description="Thermostats to control")
class WindowSetbackRule(Rule):
"""
Window setback automation rule.
When a window/door contact opens, set thermostats to eco temperature.
When closed for a minimum duration, restore previous target temperature.
Configuration:
objects:
contacts: List of contact sensor device IDs to monitor (required, min 1)
thermostats: List of thermostat device IDs to control (required, min 1)
params:
eco_target: Temperature to set when window opens (default: 16.0)
open_min_secs: Minimum seconds window must be open before triggering (default: 20)
close_min_secs: Minimum seconds window must be closed before restoring (default: 20)
previous_target_ttl_secs: How long to remember previous temperature (default: 86400)
State storage (Redis keys):
rule:{rule_id}:contact:{device_id}:state -> "open" | "closed"
rule:{rule_id}:contact:{device_id}:ts -> ISO timestamp of last change
rule:{rule_id}:thermo:{device_id}:current_target -> Current target temp (updated on every STATE)
rule:{rule_id}:thermo:{device_id}:previous -> Previous target temp (saved on window open, deleted on restore)
Logic:
1. Thermostat STATE events → update current_target in Redis
2. Window opens → copy current_target to previous, then set to eco_target
3. Window closes → restore from previous, then delete previous key
"""
def __init__(self):
super().__init__()
self._validated_objects: dict[str, WindowSetbackObjects] = {}
async def setup(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
"""Validate objects structure during setup"""
try:
validated = WindowSetbackObjects(**desc.objects)
self._validated_objects[desc.id] = validated
ctx.logger.info(
f"Rule {desc.id} validated: {len(validated.contacts)} contacts, "
f"{len(validated.thermostats)} thermostats"
)
except ValidationError as e:
raise ValueError(
f"Invalid objects configuration for rule {desc.id}: {e}"
) from e
def get_subscriptions(self, desc: RuleDescriptor) -> list[str]:
"""
Return MQTT topics to subscribe to.
Subscribe to:
- Contact sensor state changes (to detect window open/close)
- Thermostat state changes (to track current target temperature)
"""
topics = []
# Subscribe to contact sensors
contacts = desc.objects.get('contacts', [])
for contact_id in contacts:
topics.append(f"home/contact/{contact_id}/state")
# Subscribe to thermostats to track their current target temperature
thermostats = desc.objects.get('thermostats', [])
for thermo_id in thermostats:
topics.append(f"home/thermostat/{thermo_id}/state")
return topics
async def on_event(
self,
evt: dict[str, Any],
desc: RuleDescriptor,
ctx: RuleContext
) -> None:
"""
Process contact sensor or thermostat state changes.
Logic:
1. If contact opened → remember current thermostat targets, set to eco
2. If contact closed for min_secs → restore previous targets
3. If thermostat target changed → update stored previous value
"""
device_id = evt['device_id']
cap = evt['cap']
payload = evt['payload']
# Only process events for devices in our objects
target_contacts = desc.objects.get('contacts', [])
target_thermostats = desc.objects.get('thermostats', [])
if cap == 'contact' and device_id in target_contacts:
await self._handle_contact_event(evt, desc, ctx)
elif cap == 'thermostat' and device_id in target_thermostats:
await self._handle_thermostat_event(evt, desc, ctx)
async def _handle_contact_event(
self,
evt: dict[str, Any],
desc: RuleDescriptor,
ctx: RuleContext
) -> None:
"""Handle contact sensor state change."""
device_id = evt['device_id']
contact_state = evt['payload'].get('contact') # "open" or "closed"
event_ts = evt.get('ts', ctx.now().isoformat())
if not contact_state:
ctx.logger.warning(f"Contact event missing 'contact' field: {evt}")
return
# Store current state and timestamp
state_key = f"rule:{desc.id}:contact:{device_id}:state"
ts_key = f"rule:{desc.id}:contact:{device_id}:ts"
await ctx.redis.set(state_key, contact_state)
await ctx.redis.set(ts_key, event_ts)
if contact_state == 'open':
await self._on_window_opened(desc, ctx)
elif contact_state == 'closed':
await self._on_window_closed(desc, ctx)
async def _on_window_opened(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
"""
Window opened - save current temperatures, then set thermostats to eco.
Important: We must save the current target BEFORE setting to eco,
otherwise we'll save the eco temperature instead of the original.
"""
eco_target = desc.params.get('eco_target', 16.0)
target_thermostats = desc.objects.get('thermostats', [])
ttl_secs = desc.params.get('previous_target_ttl_secs', 86400)
ctx.logger.info(
f"Rule {desc.id}: Window opened, setting {len(target_thermostats)} "
f"thermostats to eco temperature {eco_target}°C"
)
# FIRST: Save current target temperatures as "previous" (before we change them!)
for thermo_id in target_thermostats:
current_key = f"rule:{desc.id}:thermo:{thermo_id}:current_target"
current_temp_str = await ctx.redis.get(current_key)
if current_temp_str:
# Save current as previous (with TTL)
prev_key = f"rule:{desc.id}:thermo:{thermo_id}:previous"
await ctx.redis.set(prev_key, current_temp_str, ttl_secs=ttl_secs)
ctx.logger.debug(
f"Saved previous target for {thermo_id}: {current_temp_str}°C"
)
else:
ctx.logger.warning(
f"No current target found for {thermo_id}, cannot save previous"
)
# THEN: Set all thermostats to eco temperature
for thermo_id in target_thermostats:
try:
await ctx.mqtt.publish_set_thermostat(thermo_id, eco_target)
ctx.logger.debug(f"Set {thermo_id} to {eco_target}°C")
except Exception as e:
ctx.logger.error(f"Failed to set {thermo_id}: {e}")
async def _on_window_closed(self, desc: RuleDescriptor, ctx: RuleContext) -> None:
"""
Window closed - restore previous temperatures.
Note: This is simplified. A production implementation would check
close_min_secs and use a timer/scheduler.
"""
target_thermostats = desc.objects.get('thermostats', [])
ctx.logger.info(
f"Rule {desc.id}: Window closed, restoring {len(target_thermostats)} "
f"thermostats to previous temperatures"
)
# Restore previous temperatures
for thermo_id in target_thermostats:
prev_key = f"rule:{desc.id}:thermo:{thermo_id}:previous"
prev_temp_str = await ctx.redis.get(prev_key)
if prev_temp_str:
try:
prev_temp = float(prev_temp_str)
await ctx.mqtt.publish_set_thermostat(thermo_id, prev_temp)
ctx.logger.debug(f"Restored {thermo_id} to {prev_temp}°C")
# Delete the previous key after restoring
await ctx.redis.delete(prev_key)
except Exception as e:
ctx.logger.error(f"Failed to restore {thermo_id}: {e}")
else:
ctx.logger.warning(
f"No previous target found for {thermo_id}, cannot restore"
)
async def _handle_thermostat_event(
self,
evt: dict[str, Any],
desc: RuleDescriptor,
ctx: RuleContext
) -> None:
"""
Handle thermostat state change - track current target temperature.
This keeps a record of the thermostat's current target, so we can
save it as "previous" when a window opens.
Important: We store in "current_target", NOT "previous". The "previous"
key is only written when a window opens, to avoid race conditions.
"""
device_id = evt['device_id']
payload = evt['payload']
current_target = payload.get('target')
if current_target is None:
return # No target in this state update
# Store current target (always update, even if it's the eco temperature)
current_key = f"rule:{desc.id}:thermo:{device_id}:current_target"
ttl_secs = desc.params.get('previous_target_ttl_secs', 86400)
await ctx.redis.set(current_key, str(current_target), ttl_secs=ttl_secs)
ctx.logger.debug(
f"Rule {desc.id}: Updated current target for {device_id}: {current_target}°C"
)
# Rule registry - maps rule type to implementation class
RULE_IMPLEMENTATIONS = {
'window_setback@1.0': WindowSetbackRule,
}

View File

@@ -1,83 +1,374 @@
"""Rules main entry point."""
"""
Rules Engine
Loads rules configuration, subscribes to MQTT events, and dispatches events
to registered rule implementations.
"""
import asyncio
import logging
import os
import signal
import sys
import time
from typing import NoReturn
from datetime import datetime
from typing import Any
from apscheduler.schedulers.background import BackgroundScheduler
from apps.rules.rules_config import load_rules_config
from apps.rules.rule_interface import (
RuleDescriptor,
RuleContext,
MQTTClient,
RedisState,
load_rule
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Global scheduler instance
scheduler: BackgroundScheduler | None = None
def rule_tick() -> None:
"""Example job that runs every minute.
This is a placeholder for actual rule evaluation logic.
class RuleEngine:
"""
logger.info("Rule tick")
def shutdown_handler(signum: int, frame: object) -> NoReturn:
"""Handle shutdown signals gracefully.
Args:
signum: Signal number
frame: Current stack frame
Rule engine that loads rules, subscribes to MQTT events,
and dispatches them to registered rule implementations.
"""
logger.info(f"Received signal {signum}, shutting down...")
if scheduler:
scheduler.shutdown(wait=True)
logger.info("Scheduler stopped")
sys.exit(0)
def __init__(
self,
rules_config_path: str,
mqtt_broker: str,
mqtt_port: int,
redis_url: str
):
"""
Initialize rule engine.
Args:
rules_config_path: Path to rules.yaml
mqtt_broker: MQTT broker hostname/IP
mqtt_port: MQTT broker port
redis_url: Redis connection URL
"""
self.rules_config_path = rules_config_path
self.mqtt_broker = mqtt_broker
self.mqtt_port = mqtt_port
self.redis_url = redis_url
# Will be initialized in setup()
self.rule_descriptors: list[RuleDescriptor] = []
self.rules: dict[str, Any] = {} # rule_id -> Rule instance
self.mqtt_client: MQTTClient | None = None
self.redis_state: RedisState | None = None
self.context: RuleContext | None = None
self._mqtt_topics: list[str] = [] # Topics to subscribe to
# For graceful shutdown
self._shutdown_event = asyncio.Event()
async def setup(self) -> None:
"""
Load configuration and instantiate rules.
Raises:
ImportError: If rule implementation not found
ValueError: If configuration is invalid
"""
logger.info(f"Loading rules configuration from {self.rules_config_path}")
# Load rules configuration
config = load_rules_config(self.rules_config_path)
self.rule_descriptors = config.rules
logger.info(f"Loaded {len(self.rule_descriptors)} rule(s) from configuration")
# Instantiate each rule
for desc in self.rule_descriptors:
if not desc.enabled:
logger.info(f" - {desc.id} (type: {desc.type}) [DISABLED]")
continue
try:
rule_instance = load_rule(desc)
self.rules[desc.id] = rule_instance
logger.info(f" - {desc.id} (type: {desc.type})")
except Exception as e:
logger.error(f"Failed to load rule {desc.id} (type: {desc.type}): {e}")
raise
enabled_count = len(self.rules)
total_count = len(self.rule_descriptors)
disabled_count = total_count - enabled_count
logger.info(f"Successfully loaded {enabled_count} rule implementation(s) ({disabled_count} disabled)")
# Call setup on each rule for validation
for rule_id, rule_instance in self.rules.items():
desc = next((d for d in self.rule_descriptors if d.id == rule_id), None)
if desc:
try:
ctx = RuleContext(
logger=logger,
mqtt_publisher=self.mqtt_client,
redis_state=self.redis_state
)
await rule_instance.setup(desc, ctx)
except Exception as e:
logger.error(f"Failed to setup rule {rule_id}: {e}")
raise
# Collect MQTT subscriptions from all enabled rules
all_topics = set()
for rule_id, rule_instance in self.rules.items():
desc = next((d for d in self.rule_descriptors if d.id == rule_id), None)
if desc:
try:
topics = rule_instance.get_subscriptions(desc)
all_topics.update(topics)
logger.debug(f"Rule {rule_id} subscribes to {len(topics)} topic(s)")
except Exception as e:
logger.error(f"Failed to get subscriptions for rule {rule_id}: {e}")
raise
logger.info(f"Total MQTT subscriptions needed: {len(all_topics)}")
# Create unique client ID to avoid conflicts
import uuid
import os
client_id_base = "rule_engine"
client_suffix = os.environ.get("MQTT_CLIENT_ID_SUFFIX") or uuid.uuid4().hex[:6]
unique_client_id = f"{client_id_base}-{client_suffix}"
# Initialize MQTT client
self.mqtt_client = MQTTClient(
broker=self.mqtt_broker,
port=self.mqtt_port,
client_id=unique_client_id
)
self.mqtt_client.set_logger(logger)
# Store topics for connection
self._mqtt_topics = list(all_topics)
# Initialize Redis state
self.redis_state = RedisState(self.redis_url)
# Create MQTT publisher wrapper for RuleContext
from apps.rules.rule_interface import MQTTPublisher
mqtt_publisher = MQTTPublisher(mqtt_client=self.mqtt_client)
# Create rule context
self.context = RuleContext(
logger=logger,
mqtt_publisher=mqtt_publisher,
redis_state=self.redis_state,
now_fn=datetime.now
)
def _filter_rules_for_event(self, event: dict[str, Any]) -> list[tuple[str, RuleDescriptor]]:
"""
Filter rules that should receive this event.
Rules match if the event's device_id is in the rule's objects.
Args:
event: Normalized MQTT event
Returns:
List of (rule_id, descriptor) tuples that should process this event
"""
matching_rules = []
device_id = event.get('device_id')
cap = event.get('cap')
if not device_id or not cap:
return matching_rules
logger.debug(f"Filtering for cap={cap}, device_id={device_id}")
# Only check enabled rules (rules in self.rules dict)
for rule_id, rule_instance in self.rules.items():
desc = next((d for d in self.rule_descriptors if d.id == rule_id), None)
if not desc:
continue
objects = desc.objects
# Check if this device is in the rule's objects
matched = False
if cap == 'contact' and objects.get('contacts'):
logger.debug(f"Rule {rule_id}: checking contacts {objects.get('contacts')}")
if device_id in objects.get('contacts', []):
matched = True
elif cap == 'thermostat' and objects.get('thermostats'):
logger.debug(f"Rule {rule_id}: checking thermostats {objects.get('thermostats')}")
if device_id in objects.get('thermostats', []):
matched = True
elif cap == 'light' and objects.get('lights'):
logger.debug(f"Rule {rule_id}: checking lights {objects.get('lights')}")
if device_id in objects.get('lights', []):
matched = True
elif cap == 'relay' and objects.get('relays'):
logger.debug(f"Rule {rule_id}: checking relays {objects.get('relays')}")
if device_id in objects.get('relays', []):
matched = True
if matched:
matching_rules.append((rule_id, desc))
return matching_rules
async def _dispatch_event(self, event: dict[str, Any]) -> None:
"""
Dispatch event to matching rules.
Calls rule.on_event() for each matching rule sequentially
to preserve order and avoid race conditions.
Args:
event: Normalized MQTT event
"""
# Debug logging
logger.debug(f"Received event: {event}")
matching_rules = self._filter_rules_for_event(event)
if not matching_rules:
# No rules interested in this event
logger.debug(f"No matching rules for {event.get('cap')}/{event.get('device_id')}")
return
logger.info(
f"Event {event['cap']}/{event['device_id']}: "
f"{len(matching_rules)} matching rule(s)"
)
# Process rules sequentially to preserve order
for rule_id, desc in matching_rules:
rule = self.rules.get(rule_id)
if not rule:
logger.warning(f"Rule instance not found for {rule_id}")
continue
try:
await rule.on_event(event, desc, self.context)
except Exception as e:
logger.error(
f"Error in rule {rule_id} processing event "
f"{event['cap']}/{event['device_id']}: {e}",
exc_info=True
)
# Continue with other rules
async def run(self) -> None:
"""
Main event loop - subscribe to MQTT and process events.
Runs until shutdown signal received.
"""
logger.info("Starting event processing loop")
try:
async for event in self.mqtt_client.connect(topics=self._mqtt_topics):
# Check for shutdown
if self._shutdown_event.is_set():
logger.info("Shutdown signal received, stopping event loop")
break
# Dispatch event to matching rules
await self._dispatch_event(event)
except asyncio.CancelledError:
logger.info("Event loop cancelled")
raise
except Exception as e:
logger.error(f"Fatal error in event loop: {e}", exc_info=True)
raise
async def shutdown(self) -> None:
"""Graceful shutdown - close connections."""
logger.info("Shutting down rule engine...")
self._shutdown_event.set()
if self.redis_state:
await self.redis_state.close()
logger.info("Redis connection closed")
logger.info("Shutdown complete")
async def main_async() -> None:
"""Async main function."""
# Read configuration from environment
rules_config = os.getenv('RULES_CONFIG', 'config/rules.yaml')
mqtt_broker = os.getenv('MQTT_BROKER', '172.16.2.16')
mqtt_port = int(os.getenv('MQTT_PORT', '1883'))
redis_host = os.getenv('REDIS_HOST', '172.23.1.116')
redis_port = int(os.getenv('REDIS_PORT', '6379'))
redis_db = int(os.getenv('REDIS_DB', '8'))
redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
logger.info("=" * 60)
logger.info("Rules Engine Starting")
logger.info("=" * 60)
logger.info(f"Config: {rules_config}")
logger.info(f"MQTT: {mqtt_broker}:{mqtt_port}")
logger.info(f"Redis: {redis_url}")
logger.info("=" * 60)
# Initialize engine
engine = RuleEngine(
rules_config_path=rules_config,
mqtt_broker=mqtt_broker,
mqtt_port=mqtt_port,
redis_url=redis_url
)
# Load rules
try:
await engine.setup()
except Exception as e:
logger.error(f"Failed to setup engine: {e}", exc_info=True)
sys.exit(1)
# Setup signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
main_task = None
def signal_handler():
logger.info("Received shutdown signal")
engine._shutdown_event.set()
if main_task and not main_task.done():
main_task.cancel()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
# Run engine
try:
main_task = asyncio.create_task(engine.run())
await main_task
except asyncio.CancelledError:
logger.info("Main task cancelled")
finally:
await engine.shutdown()
def main() -> None:
"""Run the rules application."""
global scheduler
logger.info("Rules engine starting...")
# Register signal handlers
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
# Initialize scheduler
scheduler = BackgroundScheduler()
# Add example job - runs every minute
scheduler.add_job(
rule_tick,
'interval',
minutes=1,
id='rule_tick',
name='Rule Tick Job'
)
# Start scheduler
scheduler.start()
logger.info("Scheduler started with rule_tick job (every 1 minute)")
# Run initial tick immediately
rule_tick()
# Keep the application running
"""Entry point for rule engine."""
try:
while True:
time.sleep(1)
asyncio.run(main_async())
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received, shutting down...")
scheduler.shutdown(wait=True)
logger.info("Scheduler stopped")
logger.info("Keyboard interrupt received")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":

View File

@@ -0,0 +1,5 @@
# Rules Engine Dependencies
pydantic>=2.0
redis>=5.0.1
aiomqtt>=2.0.1
pyyaml>=6.0.1

View File

@@ -0,0 +1,742 @@
"""
Rule Interface and Context Objects
Provides the core abstractions for implementing automation rules:
- RuleDescriptor: Configuration data for a rule instance
- RedisState: State persistence interface
- RuleContext: Runtime context provided to rules
- Rule: Abstract base class for all rule implementations
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Awaitable, Optional
from pydantic import BaseModel, Field
class RuleDescriptor(BaseModel):
"""
Configuration descriptor for a rule instance.
This is the validated representation of a rule from rules.yaml.
The engine loads these and passes them to rule implementations.
The 'objects' field is intentionally flexible (dict) to allow different
rule types to define their own object structures.
"""
id: str = Field(..., description="Unique identifier for this rule instance")
name: Optional[str] = Field(None, description="Optional human-readable name")
type: str = Field(..., description="Rule type with version (e.g., 'window_setback@1.0')")
enabled: bool = Field(default=True, description="Whether this rule is enabled")
objects: dict[str, Any] = Field(
default_factory=dict,
description="Objects this rule monitors or controls (structure varies by rule type)"
)
params: dict[str, Any] = Field(
default_factory=dict,
description="Rule-specific parameters"
)
class RedisState:
"""
Async Redis-backed state persistence for rules with automatic reconnection.
Provides a simple key-value and hash storage interface for rules to persist
state across restarts. All operations are asynchronous and include retry logic
for robustness against temporary Redis outages.
Key Convention:
- Callers should use keys like: f"rules:{rule_id}:contact:{device_id}"
- This class does NOT enforce key prefixes - caller controls the full key
"""
def __init__(self, url: str, max_retries: int = 3, retry_delay: float = 0.5):
"""
Initialize RedisState with connection URL.
Args:
url: Redis connection URL (e.g., 'redis://172.23.1.116:6379/8')
max_retries: Maximum number of retry attempts for operations (default: 3)
retry_delay: Initial delay between retries in seconds, uses exponential backoff (default: 0.5)
Note:
Connection is lazy - actual connection happens on first operation.
Uses connection pooling with automatic reconnection on failure.
"""
self._url = url
self._max_retries = max_retries
self._retry_delay = retry_delay
self._redis: Optional[Any] = None # redis.asyncio.Redis instance
async def _get_client(self):
"""
Get or create Redis client with connection pool.
Lazy initialization ensures we don't connect until first use.
Uses decode_responses=True for automatic UTF-8 decoding.
"""
if self._redis is None:
import redis.asyncio as aioredis
self._redis = await aioredis.from_url(
self._url,
decode_responses=True, # Automatic UTF-8 decode
encoding='utf-8',
max_connections=10, # Connection pool size
socket_connect_timeout=5,
socket_keepalive=True,
health_check_interval=30 # Auto-check connection health
)
return self._redis
async def _execute_with_retry(self, operation, *args, **kwargs):
"""
Execute Redis operation with exponential backoff retry.
Handles temporary connection failures gracefully by retrying
with exponential backoff. On permanent failure, raises the
original exception.
Args:
operation: Async callable (Redis method)
*args, **kwargs: Arguments to pass to operation
Returns:
Result of the operation
Raises:
Exception: If all retries are exhausted
"""
import asyncio
last_exception = None
for attempt in range(self._max_retries):
try:
client = await self._get_client()
return await operation(client, *args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self._max_retries - 1:
# Exponential backoff: 0.5s, 1s, 2s, ...
delay = self._retry_delay * (2 ** attempt)
await asyncio.sleep(delay)
# Reset client to force reconnection
if self._redis:
try:
await self._redis.close()
except:
pass
self._redis = None
# All retries exhausted
raise last_exception
# JSON helpers for complex data structures
def _dumps(self, obj: Any) -> str:
"""Serialize Python object to JSON string."""
import json
return json.dumps(obj, ensure_ascii=False)
def _loads(self, s: str) -> Any:
"""Deserialize JSON string to Python object."""
import json
return json.loads(s)
async def get(self, key: str) -> Optional[str]:
"""
Get a string value by key.
Args:
key: Redis key (e.g., "rules:my_rule:contact:sensor_1")
Returns:
String value or None if key doesn't exist
Example:
>>> state = RedisState("redis://localhost:6379/0")
>>> await state.set("rules:r1:temp", "22.5")
>>> temp = await state.get("rules:r1:temp")
>>> print(temp) # "22.5"
"""
async def _get(client, k):
return await client.get(k)
return await self._execute_with_retry(_get, key)
async def set(self, key: str, value: str, ttl_secs: Optional[int] = None) -> None:
"""
Set a string value with optional TTL.
Args:
key: Redis key
value: String value to store
ttl_secs: Optional time-to-live in seconds. If None, key persists indefinitely.
Example:
>>> state = RedisState("redis://localhost:6379/0")
>>> # Store with 1 hour TTL
>>> await state.set("rules:r1:previous_temp", "20.0", ttl_secs=3600)
"""
async def _set(client, k, v, ttl):
if ttl is not None:
await client.setex(k, ttl, v)
else:
await client.set(k, v)
await self._execute_with_retry(_set, key, value, ttl_secs)
async def hget(self, key: str, field: str) -> Optional[str]:
"""
Get a hash field value.
Args:
key: Redis hash key
field: Field name within the hash
Returns:
String value or None if field doesn't exist
Example:
>>> state = RedisState("redis://localhost:6379/0")
>>> await state.hset("rules:r1:device_states", "sensor_1", "open")
>>> value = await state.hget("rules:r1:device_states", "sensor_1")
>>> print(value) # "open"
"""
async def _hget(client, k, f):
return await client.hget(k, f)
return await self._execute_with_retry(_hget, key, field)
async def hset(self, key: str, field: str, value: str) -> None:
"""
Set a hash field value.
Args:
key: Redis hash key
field: Field name within the hash
value: String value to store
Example:
>>> state = RedisState("redis://localhost:6379/0")
>>> await state.hset("rules:r1:sensors", "bedroom", "open")
>>> await state.hset("rules:r1:sensors", "kitchen", "closed")
"""
async def _hset(client, k, f, v):
await client.hset(k, f, v)
await self._execute_with_retry(_hset, key, field, value)
async def expire(self, key: str, ttl_secs: int) -> None:
"""
Set or update TTL on an existing key.
Args:
key: Redis key
ttl_secs: Time-to-live in seconds
Example:
>>> state = RedisState("redis://localhost:6379/0")
>>> await state.set("rules:r1:temp", "22.5")
>>> await state.expire("rules:r1:temp", 3600) # Expire in 1 hour
"""
async def _expire(client, k, ttl):
await client.expire(k, ttl)
await self._execute_with_retry(_expire, key, ttl_secs)
async def close(self) -> None:
"""
Close Redis connection and cleanup resources.
Should be called when shutting down the application.
"""
if self._redis:
await self._redis.close()
self._redis = None
class MQTTClient:
"""
Async MQTT client for rule engine with event normalization and publishing.
Subscribes to device state topics, normalizes events to a consistent format,
and provides high-level publishing methods for device commands.
Event Normalization:
All incoming MQTT messages are parsed into a normalized event structure:
{
"topic": "home/contact/sensor_1/state",
"type": "state",
"cap": "contact", # Capability type (contact, thermostat, light, etc.)
"device_id": "sensor_1",
"payload": {"contact": "open"},
"ts": "2025-11-11T10:30:45.123456"
}
"""
def __init__(
self,
broker: str,
port: int = 1883,
client_id: str = "rule_engine",
reconnect_interval: int = 5,
max_reconnect_delay: int = 300
):
"""
Initialize MQTT client.
Args:
broker: MQTT broker hostname or IP
port: MQTT broker port (default: 1883)
client_id: Unique client ID for this connection
reconnect_interval: Initial reconnect delay in seconds (default: 5)
max_reconnect_delay: Maximum reconnect delay in seconds (default: 300)
"""
self._broker = broker
self._port = port
self._client_id = client_id
self._reconnect_interval = reconnect_interval
self._max_reconnect_delay = max_reconnect_delay
self._client = None
self._logger = None # Set externally
def set_logger(self, logger):
"""Set logger instance for connection status messages."""
self._logger = logger
def _log(self, level: str, msg: str):
"""Internal logging helper."""
if self._logger:
getattr(self._logger, level)(msg)
else:
print(f"[{level.upper()}] {msg}")
async def connect(self, topics: list[str] = None):
"""
Connect to MQTT broker with automatic reconnection.
This method manages the connection and automatically reconnects
with exponential backoff if the connection is lost.
Args:
topics: List of MQTT topics to subscribe to. If None, subscribes to nothing.
"""
import aiomqtt
from aiomqtt import Client
if topics is None:
topics = []
reconnect_delay = self._reconnect_interval
while True:
try:
self._log("info", f"Connecting to MQTT broker {self._broker}:{self._port} (client_id={self._client_id})")
async with Client(
hostname=self._broker,
port=self._port,
identifier=self._client_id,
) as client:
self._client = client
self._log("info", f"Connected to MQTT broker {self._broker}:{self._port}")
# Subscribe to provided topics
if topics:
for topic in topics:
await client.subscribe(topic)
self._log("info", f"Subscribed to {len(topics)} topic(s): {', '.join(topics[:5])}{'...' if len(topics) > 5 else ''}")
# Reset reconnect delay on successful connection
reconnect_delay = self._reconnect_interval
# Process messages - this is a generator that yields messages
async for message in client.messages:
yield self._normalize_event(message)
except aiomqtt.MqttError as e:
self._log("error", f"MQTT connection error: {e}")
self._log("info", f"Reconnecting in {reconnect_delay} seconds...")
import asyncio
await asyncio.sleep(reconnect_delay)
# Exponential backoff
reconnect_delay = min(reconnect_delay * 2, self._max_reconnect_delay)
def _normalize_event(self, message) -> dict[str, Any]:
"""
Normalize MQTT message to standard event format.
Parses topic to extract capability type and device_id,
adds timestamp, and structures payload.
Args:
message: aiomqtt.Message instance
Returns:
Normalized event dictionary
Example:
Topic: home/contact/sensor_bedroom/state
Payload: {"contact": "open"}
Returns:
{
"topic": "home/contact/sensor_bedroom/state",
"type": "state",
"cap": "contact",
"device_id": "sensor_bedroom",
"payload": {"contact": "open"},
"ts": "2025-11-11T10:30:45.123456"
}
"""
from datetime import datetime
import json
topic = str(message.topic)
topic_parts = topic.split('/')
# Parse topic: home/{capability}/{device_id}/state
if len(topic_parts) >= 4 and topic_parts[0] == 'home' and topic_parts[3] == 'state':
cap = topic_parts[1] # contact, thermostat, light, etc.
device_id = topic_parts[2]
else:
# Fallback for unexpected topic format
cap = "unknown"
device_id = topic_parts[-2] if len(topic_parts) >= 2 else "unknown"
# Parse payload
try:
payload = json.loads(message.payload.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
payload = {"raw": message.payload.decode('utf-8', errors='replace')}
# Generate timestamp
ts = datetime.now().isoformat()
return {
"topic": topic,
"type": "state",
"cap": cap,
"device_id": device_id,
"payload": payload,
"ts": ts
}
async def publish_set_thermostat(self, device_id: str, target: float) -> None:
"""
Publish thermostat target temperature command.
Publishes to: home/thermostat/{device_id}/set
QoS: 1 (at least once delivery)
Args:
device_id: Thermostat device identifier
target: Target temperature in degrees Celsius
Example:
>>> mqtt = MQTTClient("172.16.2.16", 1883)
>>> await mqtt.publish_set_thermostat("thermostat_wohnzimmer", 22.5)
Published to: home/thermostat/thermostat_wohnzimmer/set
Payload: {"type":"thermostat","payload":{"target":22.5}}
"""
import json
if self._client is None:
raise RuntimeError("MQTT client not connected. Call connect() first.")
topic = f"home/thermostat/{device_id}/set"
payload = {
"type": "thermostat",
"payload": {
"target": target
}
}
payload_str = json.dumps(payload)
await self._client.publish(
topic,
payload=payload_str.encode('utf-8'),
qos=1 # At least once delivery
)
self._log("debug", f"Published SET to {topic}: {payload_str}")
# Legacy alias for backward compatibility
class MQTTPublisher:
"""
Legacy MQTT publishing interface - DEPRECATED.
Use MQTTClient instead for new code.
This class is kept for backward compatibility with existing documentation.
"""
def __init__(self, mqtt_client):
"""
Initialize MQTT publisher.
Args:
mqtt_client: MQTTClient instance
"""
self._mqtt = mqtt_client
async def publish_set_thermostat(self, device_id: str, target: float) -> None:
"""
Publish a thermostat target temperature command.
Args:
device_id: Thermostat device identifier
target: Target temperature in degrees Celsius
"""
await self._mqtt.publish_set_thermostat(device_id, target)
class RuleContext:
"""
Runtime context provided to rules during event processing.
Contains all external dependencies and utilities a rule needs:
- Logger for diagnostics
- MQTT client for publishing commands
- Redis client for state persistence
- Current timestamp function
"""
def __init__(
self,
logger,
mqtt_publisher: MQTTPublisher,
redis_state: RedisState,
now_fn=None
):
"""
Initialize rule context.
Args:
logger: Logger instance (e.g., logging.Logger)
mqtt_publisher: MQTTPublisher instance for device commands
redis_state: RedisState instance for persistence
now_fn: Optional callable returning current datetime (defaults to datetime.now)
"""
self.logger = logger
self.mqtt = mqtt_publisher
self.redis = redis_state
self._now_fn = now_fn or datetime.now
def now(self) -> datetime:
"""
Get current timestamp.
Returns:
Current datetime (timezone-aware if now_fn provides it)
"""
return self._now_fn()
class Rule(ABC):
"""
Abstract base class for all automation rule implementations.
Rules implement event-driven automation logic. The engine calls on_event()
for each relevant device state change, passing the event data, rule configuration,
and runtime context.
Implementations must be idempotent - processing the same event multiple times
should produce the same result.
Example implementation:
class WindowSetbackRule(Rule):
def get_subscriptions(self, desc: RuleDescriptor) -> list[str]:
# Subscribe to contact sensor state topics
topics = []
for contact_id in desc.objects.contacts or []:
topics.append(f"home/contact/{contact_id}/state")
return topics
async def on_event(self, evt: dict, desc: RuleDescriptor, ctx: RuleContext) -> None:
device_id = evt['device_id']
cap = evt['cap']
if cap == 'contact':
contact_state = evt['payload'].get('contact')
if contact_state == 'open':
# Window opened - set thermostats to eco
for thermo_id in desc.objects.thermostats or []:
eco_temp = desc.params.get('eco_target', 16.0)
await ctx.mqtt.publish_set_thermostat(thermo_id, eco_temp)
"""
@abstractmethod
def get_subscriptions(self, desc: RuleDescriptor) -> list[str]:
"""
Return list of MQTT topics this rule needs to subscribe to.
Called once during rule engine setup. The rule examines its configuration
(desc.objects) and returns the specific state topics it needs to monitor.
Args:
desc: Rule configuration from rules.yaml
Returns:
List of MQTT topic patterns/strings to subscribe to
Example:
For a window setback rule monitoring 2 contacts:
['home/contact/sensor_bedroom/state', 'home/contact/sensor_kitchen/state']
"""
pass
@abstractmethod
async def on_event(
self,
evt: dict[str, Any],
desc: RuleDescriptor,
ctx: RuleContext
) -> None:
"""
Process a device state change event.
This method is called by the rule engine whenever a device state changes
that is relevant to this rule. The implementation should examine the event
and take appropriate actions (e.g., publish MQTT commands, update state).
MUST be idempotent: Processing the same event multiple times should be safe.
Args:
evt: Event dictionary with the following structure:
{
"topic": "home/contact/device_id/state", # MQTT topic
"type": "state", # Message type
"cap": "contact", # Capability type
"device_id": "kontakt_wohnzimmer", # Device identifier
"payload": {"contact": "open"}, # Capability-specific payload
"ts": "2025-11-11T10:30:45.123456" # ISO timestamp
}
desc: Rule configuration from rules.yaml
ctx: Runtime context with logger, MQTT, Redis, and timestamp utilities
Returns:
None
Raises:
Exception: Implementation may raise exceptions for errors.
The engine will log them but continue processing.
"""
pass
# ============================================================================
# Dynamic Rule Loading
# ============================================================================
import importlib
import re
from typing import Type
# Cache for loaded rule classes (per process)
_RULE_CLASS_CACHE: dict[str, Type[Rule]] = {}
def load_rule(desc: RuleDescriptor) -> Rule:
"""
Dynamically load and instantiate a rule based on its type descriptor.
Convention:
- Rule type format: 'name@version' (e.g., 'window_setback@1.0')
- Module path: apps.rules.impl.{name}
- Class name: PascalCase version of name + 'Rule'
Example: 'window_setback''WindowSetbackRule'
Args:
desc: Rule descriptor from rules.yaml
Returns:
Instantiated Rule object
Raises:
ValueError: If type format is invalid
ImportError: If rule module cannot be found
AttributeError: If rule class cannot be found in module
Examples:
>>> desc = RuleDescriptor(
... id="test_rule",
... type="window_setback@1.0",
... targets={},
... params={}
... )
>>> rule = load_rule(desc)
>>> isinstance(rule, Rule)
True
"""
rule_type = desc.type
# Check cache first
if rule_type in _RULE_CLASS_CACHE:
rule_class = _RULE_CLASS_CACHE[rule_type]
return rule_class()
# Parse type: 'name@version'
if '@' not in rule_type:
raise ValueError(
f"Invalid rule type '{rule_type}': must be in format 'name@version' "
f"(e.g., 'window_setback@1.0')"
)
name, version = rule_type.split('@', 1)
# Validate name (alphanumeric and underscores only)
if not re.match(r'^[a-z][a-z0-9_]*$', name):
raise ValueError(
f"Invalid rule name '{name}': must start with lowercase letter "
f"and contain only lowercase letters, numbers, and underscores"
)
# Convert snake_case to PascalCase for class name
# Example: 'window_setback' → 'WindowSetbackRule'
class_name = ''.join(word.capitalize() for word in name.split('_')) + 'Rule'
# Construct module path
module_path = f'apps.rules.impl.{name}'
# Try to import the module
try:
module = importlib.import_module(module_path)
except ImportError as e:
raise ImportError(
f"Cannot load rule type '{rule_type}': module '{module_path}' not found.\n"
f"Hint: Create file 'apps/rules/impl/{name}.py' with class '{class_name}'.\n"
f"Original error: {e}"
) from e
# Try to get the class from the module
try:
rule_class = getattr(module, class_name)
except AttributeError as e:
raise AttributeError(
f"Cannot load rule type '{rule_type}': class '{class_name}' not found in module '{module_path}'.\n"
f"Hint: Define 'class {class_name}(Rule):' in 'apps/rules/impl/{name}.py'.\n"
f"Available classes in module: {[name for name in dir(module) if not name.startswith('_')]}"
) from e
# Validate that it's a Rule subclass
if not issubclass(rule_class, Rule):
raise TypeError(
f"Class '{class_name}' in '{module_path}' is not a subclass of Rule. "
f"Ensure it inherits from apps.rules.rule_interface.Rule"
)
# Cache the class
_RULE_CLASS_CACHE[rule_type] = rule_class
# Instantiate and return
return rule_class()

122
apps/rules/rules_config.py Normal file
View File

@@ -0,0 +1,122 @@
"""
Rules Configuration Schema and Loader
Provides Pydantic models for validating rules.yaml configuration.
"""
from pathlib import Path
from typing import Any, Optional
import yaml
from pydantic import BaseModel, Field, field_validator
class Rule(BaseModel):
"""Single rule configuration"""
id: str = Field(..., description="Unique rule identifier")
name: Optional[str] = Field(None, description="Optional human-readable name")
type: str = Field(..., description="Rule type (e.g., 'window_setback@1.0')")
enabled: bool = Field(default=True, description="Whether this rule is enabled")
objects: dict[str, Any] = Field(default_factory=dict, description="Objects this rule monitors or controls")
params: dict[str, Any] = Field(default_factory=dict, description="Rule-specific parameters")
@field_validator('id')
@classmethod
def validate_id(cls, v: str) -> str:
if not v or not v.strip():
raise ValueError("Rule ID cannot be empty")
return v.strip()
@field_validator('type')
@classmethod
def validate_type(cls, v: str) -> str:
if not v or not v.strip():
raise ValueError("Rule type cannot be empty")
if '@' not in v:
raise ValueError(f"Rule type must include version (e.g., 'window_setback@1.0'), got: {v}")
return v.strip()
class RulesConfig(BaseModel):
"""Root configuration object"""
rules: list[Rule] = Field(..., description="List of all rules")
@field_validator('rules')
@classmethod
def validate_unique_ids(cls, rules: list[Rule]) -> list[Rule]:
"""Ensure all rule IDs are unique"""
ids = [rule.id for rule in rules]
duplicates = [id for id in ids if ids.count(id) > 1]
if duplicates:
raise ValueError(f"Duplicate rule IDs found: {set(duplicates)}")
return rules
def load_rules_config(config_path: str | Path = "config/rules.yaml") -> RulesConfig:
"""
Load and validate rules configuration from YAML file.
Args:
config_path: Path to rules.yaml file
Returns:
Validated RulesConfig object
Raises:
FileNotFoundError: If config file doesn't exist
ValueError: If YAML is invalid or validation fails
"""
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Rules configuration not found: {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
try:
data = yaml.safe_load(f)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML in {config_path}: {e}") from e
if not data:
raise ValueError(f"Empty configuration file: {config_path}")
if 'rules' not in data:
raise ValueError(
f"Missing 'rules:' key in {config_path}. "
"Configuration must start with 'rules:' followed by a list of rule definitions."
)
try:
return RulesConfig(**data)
except Exception as e:
raise ValueError(f"Configuration validation failed: {e}") from e
def get_rule_by_id(config: RulesConfig, rule_id: str) -> Rule | None:
"""Get a specific rule by ID"""
for rule in config.rules:
if rule.id == rule_id:
return rule
return None
def get_rules_by_type(config: RulesConfig, rule_type: str) -> list[Rule]:
"""Get all rules of a specific type"""
return [rule for rule in config.rules if rule.type == rule_type]
if __name__ == "__main__":
# Test configuration loading
try:
config = load_rules_config()
print(f"✅ Loaded {len(config.rules)} rules:")
for rule in config.rules:
name = f" ({rule.name})" if rule.name else ""
enabled = "" if rule.enabled else ""
print(f" [{enabled}] {rule.id}{name}: {rule.type}")
if rule.objects:
obj_summary = ", ".join(f"{k}: {len(v) if isinstance(v, list) else v}"
for k, v in rule.objects.items())
print(f" Objects: {obj_summary}")
except Exception as e:
print(f"❌ Configuration error: {e}")

View File

@@ -59,13 +59,9 @@ docker run --rm -p 8010:8010 \
simulator:dev
```
**Mit Docker Network (optional):**
```bash
docker run --rm -p 8010:8010 \
--name simulator \
-e MQTT_BROKER=172.23.1.102 \
simulator:dev
```
**Note for finch/nerdctl users:**
- finch binds ports to `127.0.0.1` by default
- The web interface will be accessible at `http://127.0.0.1:8010`
#### Environment Variables

View File

@@ -37,17 +37,6 @@ docker build -t ui:dev -f apps/ui/Dockerfile .
#### Run Container
**Linux Server (empfohlen):**
```bash
# Mit Docker Network für Container-to-Container Kommunikation
docker run --rm -p 8002:8002 \
-e UI_PORT=8002 \
-e API_BASE=http://172.19.1.11:8001 \
-e BASE_PATH=/ \
ui:dev
```
**macOS mit finch/nerdctl:**
```bash
docker run --rm -p 8002:8002 \
--add-host=host.docker.internal:host-gateway \
@@ -57,11 +46,10 @@ docker run --rm -p 8002:8002 \
ui:dev
```
**Hinweise:**
- **Linux**: Verwende Docker Network und Service-Namen (`http://api:8001`)
- **macOS/finch**: Verwende `host.docker.internal` mit `--add-host` flag
- Die UI macht Server-Side API-Aufrufe beim Rendern der Seite
- Browser-seitige Realtime-Updates (SSE) gehen direkt vom Browser zur API
**Note for finch/nerdctl users:**
- finch binds ports to `127.0.0.1` by default (not `0.0.0.0`)
- Use `--add-host=host.docker.internal:host-gateway` to allow container-to-host communication
- Set `API_BASE=http://host.docker.internal:8001` to reach the API container
#### Environment Variables

View File

@@ -0,0 +1,13 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 100 100">
<!-- Roof -->
<path d="M50 10 L90 45 L85 45 L85 50 L15 50 L15 45 L10 45 Z" fill="#667eea" stroke="#4c51bf" stroke-width="2" stroke-linejoin="round"/>
<!-- House body -->
<rect x="15" y="45" width="70" height="45" fill="#764ba2" stroke="#4c51bf" stroke-width="2"/>
<!-- Door -->
<rect x="35" y="60" width="15" height="30" fill="#4c51bf" rx="2"/>
<!-- Window -->
<rect x="60" y="60" width="20" height="15" fill="#fbbf24" stroke="#f59e0b" stroke-width="1"/>
<!-- Window panes -->
<line x1="70" y1="60" x2="70" y2="75" stroke="#f59e0b" stroke-width="1"/>
<line x1="60" y1="67.5" x2="80" y2="67.5" stroke="#f59e0b" stroke-width="1"/>
</svg>

After

Width:  |  Height:  |  Size: 721 B

View File

@@ -4,6 +4,7 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Home Automation</title>
<link rel="icon" type="image/svg+xml" href="/static/favicon.svg">
<style>
* {
margin: 0;
@@ -165,6 +166,12 @@
margin: 0;
}
.room-title h2 {
font-size: 1.5rem;
font-weight: 700;
margin: 0;
}
.room-toggle {
font-size: 1.5rem;
color: #667eea;
@@ -478,6 +485,208 @@
text-align: center;
}
/* Groups Section Styles */
.groups-section .devices {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(280px, 1fr));
gap: 1rem;
}
.group-card {
background: white;
border-radius: 12px;
padding: 1.25rem;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.08);
transition: all 0.3s;
}
.group-card:hover {
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.12);
transform: translateY(-2px);
}
.group-card-header {
margin-bottom: 1rem;
}
.group-card-title {
font-size: 1.125rem;
font-weight: 600;
color: #333;
margin-bottom: 0.25rem;
}
.group-card-subtitle {
font-size: 0.875rem;
color: #666;
}
.group-card-actions {
display: flex;
gap: 0.5rem;
}
.group-button {
flex: 1;
padding: 0.75rem;
border: none;
border-radius: 8px;
font-size: 0.875rem;
font-weight: 600;
cursor: pointer;
transition: all 0.2s;
min-height: 44px;
position: relative;
}
.group-button.on {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
.group-button.on:hover {
background: linear-gradient(135deg, #5568d3 0%, #653a8e 100%);
}
.group-button.off {
background: #f1f3f5;
color: #495057;
}
.group-button.off:hover {
background: #e9ecef;
}
.group-button:active {
transform: scale(0.95);
}
.group-button:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.group-button .spinner {
display: inline-block;
width: 14px;
height: 14px;
border: 2px solid rgba(255, 255, 255, 0.3);
border-top-color: white;
border-radius: 50%;
animation: spin 0.6s linear infinite;
}
/* Scenes Section Styles */
.scenes-section .devices {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(200px, 1fr));
gap: 1rem;
}
.scene-button {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
color: white;
border: none;
border-radius: 12px;
padding: 1.25rem;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: all 0.3s;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
min-height: 80px;
display: flex;
align-items: center;
justify-content: center;
position: relative;
}
.scene-button:hover {
background: linear-gradient(135deg, #e082ea 0%, #e4465b 100%);
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15);
transform: translateY(-2px);
}
.scene-button:active {
transform: translateY(0) scale(0.95);
}
.scene-button:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.scene-button .spinner {
display: inline-block;
width: 16px;
height: 16px;
border: 2px solid rgba(255, 255, 255, 0.3);
border-top-color: white;
border-radius: 50%;
animation: spin 0.6s linear infinite;
margin-left: 0.5rem;
}
@keyframes spin {
to { transform: rotate(360deg); }
}
/* Toast Notification */
.toast {
position: fixed;
bottom: 2rem;
right: 2rem;
background: white;
padding: 1rem 1.5rem;
border-radius: 8px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.2);
display: flex;
align-items: center;
gap: 0.75rem;
z-index: 1000;
animation: slideIn 0.3s ease;
max-width: 400px;
}
@keyframes slideIn {
from {
transform: translateX(100%);
opacity: 0;
}
to {
transform: translateX(0);
opacity: 1;
}
}
.toast.success {
border-left: 4px solid #51cf66;
}
.toast.error {
border-left: 4px solid #ff6b6b;
}
.toast-icon {
font-size: 1.5rem;
}
.toast-message {
flex: 1;
color: #333;
font-size: 0.875rem;
}
.toast-close {
background: none;
border: none;
color: #999;
cursor: pointer;
font-size: 1.25rem;
padding: 0;
line-height: 1;
}
.events {
margin-top: 2rem;
background: white;
@@ -534,6 +743,106 @@
font-size: 0.875rem;
color: #999;
}
/* Responsive Design */
@media (max-width: 768px) {
.container {
padding: 1rem;
}
header {
padding: 1rem;
}
.header-content h1 {
font-size: 1.5rem;
}
.header-content p {
font-size: 0.75rem;
}
.header-buttons {
gap: 0.5rem;
}
.refresh-btn, .collapse-all-btn {
width: 36px;
height: 36px;
font-size: 1.25rem;
}
.room {
padding: 1rem;
}
.room-header h2 {
font-size: 1.125rem;
}
.devices {
grid-template-columns: 1fr;
}
/* Groups responsive */
.groups-section .devices {
grid-template-columns: 1fr;
}
.group-card {
padding: 1rem;
}
.group-card-title {
font-size: 1rem;
}
/* Scenes responsive */
.scenes-section .devices {
grid-template-columns: 1fr;
}
.scene-button {
min-height: 60px;
font-size: 0.9375rem;
}
/* Toast responsive */
.toast {
bottom: 1rem;
right: 1rem;
left: 1rem;
max-width: none;
}
/* Thermostat responsive */
.thermostat-display {
grid-template-columns: 1fr;
gap: 0.75rem;
}
.temp-controls {
flex-direction: column;
}
.temp-button {
width: 100%;
}
}
@media (min-width: 769px) and (max-width: 1024px) {
.devices {
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
}
.groups-section .devices {
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
}
.scenes-section .devices {
grid-template-columns: repeat(auto-fill, minmax(180px, 1fr));
}
}
</style>
</head>
<body>
@@ -571,6 +880,8 @@
{% if device.type == "light" %}
Light
{% if device.features.brightness %}• Dimmbar{% endif %}
{% elif device.type == "relay" %}
Relay
{% elif device.type == "thermostat" %}
Thermostat
{% elif device.type == "contact" or device.type == "contact_sensor" %}
@@ -621,6 +932,21 @@
</div>
{% endif %}
{% elif device.type == "relay" %}
<div class="device-state">
<span class="state-label">Status:</span>
<span class="state-value off" id="state-{{ device.device_id }}">off</span>
</div>
<div class="controls">
<button
class="toggle-button off"
id="toggle-{{ device.device_id }}"
onclick="toggleDevice('{{ device.device_id }}')">
Einschalten
</button>
</div>
{% elif device.type == "thermostat" %}
<div class="thermostat-display">
<div class="temp-reading">
@@ -695,6 +1021,38 @@
</div>
{% endif %}
<!-- Groups Section -->
<section class="room groups-section">
<div class="room-header" onclick="toggleRoom('groups-content')">
<div class="room-title">
<h2>🎛️ Gruppen</h2>
<span class="device-count" id="groups-count">Lädt...</span>
</div>
<span class="room-toggle collapsed" id="toggle-groups-content"></span>
</div>
<div class="room-content collapsed" id="groups-content">
<div class="devices" id="groups-container">
<p style="color: #666;">Lade Gruppen...</p>
</div>
</div>
</section>
<!-- Scenes Section -->
<section class="room scenes-section">
<div class="room-header" onclick="toggleRoom('scenes-content')">
<div class="room-title">
<h2>🎬 Szenen</h2>
<span class="device-count" id="scenes-count">Lädt...</span>
</div>
<span class="room-toggle collapsed" id="toggle-scenes-content"></span>
</div>
<div class="room-content collapsed" id="scenes-content">
<div class="devices" id="scenes-container">
<p style="color: #666;">Lade Szenen...</p>
</div>
</div>
</section>
<div class="events">
<h2>📡 Realtime Events</h2>
<div class="event-list" id="event-list">
@@ -750,7 +1108,8 @@
// Set room icons based on room name
document.addEventListener('DOMContentLoaded', () => {
const roomTitles = document.querySelectorAll('.room-title');
// Only select room titles that are <h2> elements (exclude Groups/Scenes sections)
const roomTitles = document.querySelectorAll('.room-title:not(.groups-section .room-title):not(.scenes-section .room-title)');
roomTitles.forEach(title => {
const roomName = title.textContent.trim().toLowerCase();
let icon = '🏠'; // Default
@@ -807,11 +1166,13 @@
let eventSource = null;
let currentState = {};
let thermostatTargets = {};
let deviceTypes = {};
// Initialize device states
{% for room in rooms %}
{% for device in room.devices %}
{% if device.type == "light" %}
deviceTypes['{{ device.device_id }}'] = '{{ device.type }}';
{% if device.type == "light" or device.type == "relay" %}
currentState['{{ device.device_id }}'] = 'off';
{% elif device.type == "thermostat" %}
thermostatTargets['{{ device.device_id }}'] = 21.0;
@@ -822,6 +1183,7 @@
// Toggle device state
async function toggleDevice(deviceId) {
const newState = currentState[deviceId] === 'on' ? 'off' : 'on';
const deviceType = deviceTypes[deviceId] || 'light';
try {
const response = await fetch(api(`/devices/${deviceId}/set`), {
@@ -830,7 +1192,7 @@
'Content-Type': 'application/json'
},
body: JSON.stringify({
type: 'light',
type: deviceType,
payload: {
power: newState
}
@@ -1239,6 +1601,274 @@
loadDevices().then(() => {
console.log('Initial states loaded, now connecting SSE...');
});
// ===== GROUPS & SCENES FUNCTIONALITY =====
// Show toast notification
function showToast(message, type = 'success') {
// Remove existing toasts
document.querySelectorAll('.toast').forEach(t => t.remove());
const toast = document.createElement('div');
toast.className = `toast ${type}`;
toast.innerHTML = `
<span class="toast-icon">${type === 'success' ? '✓' : '✗'}</span>
<span class="toast-message">${message}</span>
<button class="toast-close" onclick="this.parentElement.remove()">×</button>
`;
document.body.appendChild(toast);
// Auto-remove after 4 seconds
setTimeout(() => {
toast.style.animation = 'slideIn 0.3s ease reverse';
setTimeout(() => toast.remove(), 300);
}, 4000);
}
// Load and render groups
async function loadGroups() {
try {
const response = await fetch(api('/groups'));
if (!response.ok) throw new Error('Failed to load groups');
const groups = await response.json();
const container = document.getElementById('groups-container');
const countSpan = document.getElementById('groups-count');
if (groups.length === 0) {
container.innerHTML = '<p style="color: #666;">Keine Gruppen konfiguriert.</p>';
countSpan.textContent = '';
return;
}
countSpan.textContent = '';
container.innerHTML = groups.map(group => `
<div class="group-card">
<div class="group-card-header">
<div class="group-card-title">${group.name}</div>
<div class="group-card-subtitle">${group.device_count} ${group.device_count === 1 ? 'Gerät' : 'Geräte'}</div>
</div>
${group.capabilities.brightness ? `
<div class="brightness-control">
<label class="brightness-label">
<span>🔆 Helligkeit</span>
<span class="brightness-value" id="group-brightness-${group.id}">50%</span>
</label>
<input type="range"
min="0"
max="100"
value="50"
class="brightness-slider"
id="slider-group-${group.id}"
oninput="updateGroupBrightnessDisplay('${group.id}', this.value)"
onchange="setGroupBrightness('${group.id}', this.value)">
</div>
` : ''}
<div class="group-card-actions">
<button class="group-button on" onclick="setGroup('${group.id}', 'on', this)">
Alle An
</button>
<button class="group-button off" onclick="setGroup('${group.id}', 'off', this)">
Alle Aus
</button>
</div>
</div>
`).join('');
console.log(`Loaded ${groups.length} groups`);
} catch (error) {
console.error('Failed to load groups:', error);
const container = document.getElementById('groups-container');
container.innerHTML = '<p style="color: #999;">Fehler beim Laden der Gruppen</p>';
showToast('Fehler beim Laden der Gruppen', 'error');
}
}
// Update group brightness display value
function updateGroupBrightnessDisplay(groupId, value) {
const display = document.getElementById(`group-brightness-${groupId}`);
if (display) {
display.textContent = `${value}%`;
}
}
// Set group brightness immediately when slider changes
async function setGroupBrightness(groupId, brightness) {
try {
const response = await fetch(api(`/groups/${groupId}/set`), {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: {
type: 'power',
payload: {
power: 'on',
brightness: parseInt(brightness)
}
}
})
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'Request failed');
}
const result = await response.json();
const publishedCount = result.execution_plan.filter(p => p.status === 'published').length;
console.log(`Group ${groupId} brightness set to ${brightness}%: ${publishedCount} devices`);
addEvent({
action: 'group_brightness',
group_id: groupId,
brightness: brightness,
device_count: publishedCount
});
} catch (error) {
console.error('Failed to set group brightness:', error);
showToast(`Fehler beim Setzen der Helligkeit: ${error.message}`, 'error');
}
}
// Execute group action
async function setGroup(groupId, power, buttonElement) {
const allButtons = buttonElement.parentElement.querySelectorAll('button');
allButtons.forEach(btn => btn.disabled = true);
const originalHTML = buttonElement.innerHTML;
buttonElement.innerHTML = '<span class="spinner"></span>';
// Get brightness value if slider exists
const slider = document.getElementById(`slider-group-${groupId}`);
const brightness = slider ? parseInt(slider.value) : null;
try {
const payload = { power };
if (brightness !== null && power === 'on') {
payload.brightness = brightness;
}
const response = await fetch(api(`/groups/${groupId}/set`), {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: {
type: 'power',
payload: payload
}
})
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'Request failed');
}
const result = await response.json();
const publishedCount = result.execution_plan.filter(p => p.status === 'published').length;
showToast(`Gruppe ${power === 'on' ? 'eingeschaltet' : 'ausgeschaltet'}: ${publishedCount} Geräte`, 'success');
console.log(`Group ${groupId} set to ${power}:`, result);
addEvent({
action: 'group_set',
group_id: groupId,
power: power,
device_count: publishedCount
});
} catch (error) {
console.error('Failed to set group:', error);
showToast(`Fehler: ${error.message}`, 'error');
} finally {
buttonElement.innerHTML = originalHTML;
allButtons.forEach(btn => btn.disabled = false);
}
}
// Load and render scenes
async function loadScenes() {
try {
const response = await fetch(api('/scenes'));
if (!response.ok) throw new Error('Failed to load scenes');
const scenes = await response.json();
const container = document.getElementById('scenes-container');
const countSpan = document.getElementById('scenes-count');
if (scenes.length === 0) {
container.innerHTML = '<p style="color: #666;">Keine Szenen konfiguriert.</p>';
countSpan.textContent = '';
return;
}
countSpan.textContent = '';
container.innerHTML = scenes.map(scene => `
<button class="scene-button" onclick="runScene('${scene.id}', this)">
${scene.name}
</button>
`).join('');
console.log(`Loaded ${scenes.length} scenes`);
} catch (error) {
console.error('Failed to load scenes:', error);
const container = document.getElementById('scenes-container');
container.innerHTML = '<p style="color: #999;">Fehler beim Laden der Szenen</p>';
showToast('Fehler beim Laden der Szenen', 'error');
}
}
// Execute scene
async function runScene(sceneId, buttonElement) {
buttonElement.disabled = true;
const originalHTML = buttonElement.innerHTML;
buttonElement.innerHTML = `${originalHTML} <span class="spinner"></span>`;
try {
const response = await fetch(api(`/scenes/${sceneId}/run`), {
method: 'POST',
headers: { 'Content-Type': 'application/json' }
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'Request failed');
}
const result = await response.json();
const totalPublished = result.steps.reduce((sum, step) =>
sum + step.devices.filter(d => d.status === 'published').length, 0
);
showToast(`Szene ausgeführt: ${totalPublished} Aktionen`, 'success');
console.log(`Scene ${sceneId} executed:`, result);
addEvent({
action: 'scene_run',
scene_id: sceneId,
steps: result.steps.length,
total_actions: totalPublished
});
} catch (error) {
console.error('Failed to run scene:', error);
showToast(`Fehler: ${error.message}`, 'error');
} finally {
buttonElement.innerHTML = originalHTML;
buttonElement.disabled = false;
}
}
// Load groups and scenes on page load
document.addEventListener('DOMContentLoaded', () => {
loadGroups();
loadScenes();
});
</script>
</body>
</html>

View File

@@ -11,12 +11,11 @@ redis:
channel: "ui:updates"
devices:
- device_id: lampe_semeniere_wohnzimmer
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0xf0d1b8000015480b"
set: "zigbee2mqtt/0xf0d1b8000015480b/set"
@@ -26,12 +25,11 @@ devices:
model: "AC10691"
vendor: "OSRAM"
- device_id: grosse_lampe_wohnzimmer
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0xf0d1b80000151aca"
set: "zigbee2mqtt/0xf0d1b80000151aca/set"
@@ -41,12 +39,11 @@ devices:
model: "AC10691"
vendor: "OSRAM"
- device_id: lampe_naehtischchen_wohnzimmer
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0x842e14fffee560ee"
set: "zigbee2mqtt/0x842e14fffee560ee/set"
@@ -56,12 +53,11 @@ devices:
model: "HG06337"
vendor: "Lidl"
- device_id: kleine_lampe_rechts_esszimmer
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0xf0d1b80000156645"
set: "zigbee2mqtt/0xf0d1b80000156645/set"
@@ -71,12 +67,11 @@ devices:
model: "AC10691"
vendor: "OSRAM"
- device_id: kleine_lampe_links_esszimmer
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0xf0d1b80000153099"
set: "zigbee2mqtt/0xf0d1b80000153099/set"
@@ -101,12 +96,11 @@ devices:
model: "LED1842G3"
vendor: "IKEA"
- device_id: medusalampe_schlafzimmer
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0xf0d1b80000154c7c"
set: "zigbee2mqtt/0xf0d1b80000154c7c/set"
@@ -192,12 +186,11 @@ devices:
model: "8718699673147"
vendor: "Philips"
- device_id: schranklicht_vorne_patty
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0xf0d1b80000154cf5"
set: "zigbee2mqtt/0xf0d1b80000154cf5/set"
@@ -504,12 +497,11 @@ devices:
peer_id: "48"
channel: "1"
- device_id: sterne_wohnzimmer
type: light
cap_version: "light@1.2.0"
type: relay
cap_version: "relay@1.0.0"
technology: zigbee2mqtt
features:
power: true
brightness: false
topics:
state: "zigbee2mqtt/0xf0d1b80000155fc2"
set: "zigbee2mqtt/0xf0d1b80000155fc2/set"
@@ -518,7 +510,6 @@ devices:
ieee_address: "0xf0d1b80000155fc2"
model: "AC10691"
vendor: "OSRAM"
- device_id: kontakt_schlafzimmer_strasse
type: contact
name: Kontakt Schlafzimmer Straße
@@ -719,5 +710,52 @@ devices:
topics:
state: zigbee2mqtt/0x00158d0009421422
features: {}
- device_id: licht_spuele_kueche
type: relay
cap_version: "relay@1.0.0"
technology: shelly
features:
power: true
topics:
set: "shellies/LightKitchenSink/relay/0/command"
state: "shellies/LightKitchenSink/relay/0"
- device_id: licht_schrank_esszimmer
type: relay
cap_version: "relay@1.0.0"
technology: shelly
features:
power: true
topics:
set: "shellies/schrankesszimmer/relay/0/command"
state: "shellies/schrankesszimmer/relay/0"
- device_id: licht_regal_wohnzimmer
type: relay
cap_version: "relay@1.0.0"
technology: shelly
features:
power: true
topics:
set: "shellies/wohnzimmer-regal/relay/0/command"
state: "shellies/wohnzimmer-regal/relay/0"
- device_id: licht_flur_schrank
type: relay
cap_version: "relay@1.0.0"
technology: shelly
features:
power: true
topics:
set: "shellies/schrankflur/relay/0/command"
state: "shellies/schrankflur/relay/0"
- device_id: licht_terasse
type: relay
cap_version: "relay@1.0.0"
technology: shelly
features:
power: true
topics:
set: "shellies/lichtterasse/relay/0/command"
state: "shellies/lichtterasse/relay/0"

36
config/groups.yaml Normal file
View File

@@ -0,0 +1,36 @@
version: 1
groups:
- id: "kueche_lichter"
name: "Küche alle Lampen"
selector:
type: "light"
room: "Küche"
capabilities:
power: true
brightness: true
- id: "alles_lichter"
name: "Alle Lichter"
selector:
type: "light"
capabilities:
power: true
- id: "schlafzimmer_lichter"
name: "Schlafzimmer alle Lampen"
selector:
type: "light"
room: "Schlafzimmer"
capabilities:
power: true
brightness: true
- id: "schlafzimmer_schlummer_licht"
name: "Schlafzimmer Schlummerlicht"
device_ids:
- bettlicht_patty
- bettlicht_wolfgang
- medusalampe_schlafzimmer
capabilities:
power: true
brightness: true

View File

@@ -51,12 +51,16 @@ rooms:
title: kleine Lampe rechts Esszimmer
icon: 💡
rank: 90
- device_id: licht_schrank_esszimmer
title: Schranklicht Esszimmer
icon: 💡
rank: 92
- device_id: thermostat_esszimmer
title: Thermostat Esszimmer
icon: 🌡️
rank: 95
- device_id: kontakt_esszimmer_strasse_rechts
title: Kontakt Straße rechts
title: Kontakt Straße rechtsFtest
icon: 🪟
rank: 96
- device_id: kontakt_esszimmer_strasse_links
@@ -81,6 +85,10 @@ rooms:
title: grosse Lampe Wohnzimmer
icon: 💡
rank: 130
- device_id: licht_regal_wohnzimmer
title: Regallicht Wohnzimmer
icon: 💡
rank: 132
- device_id: thermostat_wohnzimmer
title: Thermostat Wohnzimmer
icon: 🌡️
@@ -103,6 +111,10 @@ rooms:
title: Küche Deckenlampe
icon: 💡
rank: 140
- device_id: licht_spuele_kueche
title: Küche Spüle
icon: 💡
rank: 142
- device_id: thermostat_kueche
title: Kueche
icon: 🌡️
@@ -189,6 +201,10 @@ rooms:
title: Haustür
icon: 💡
rank: 220
- device_id: licht_flur_schrank
title: Schranklicht Flur
icon: 💡
rank: 222
- device_id: licht_flur_oben_am_spiegel
title: Licht Flur oben am Spiegel
icon: 💡
@@ -249,4 +265,10 @@ rooms:
title: Temperatur & Luftfeuchte
icon: 🌡️
rank: 290
- name: Outdoor
devices:
- device_id: licht_terasse
title: Licht Terasse
icon: 💡
rank: 290

94
config/rules.yaml Normal file
View File

@@ -0,0 +1,94 @@
# Rules Configuration
# Auto-generated from devices.yaml
rules:
- id: window_setback_esszimmer
enabled: false
name: Fensterabsenkung Esszimmer
type: window_setback@1.0
objects:
contacts:
- kontakt_esszimmer_strasse_links
- kontakt_esszimmer_strasse_rechts
thermostats:
- thermostat_esszimmer
params:
eco_target: 16.0
open_min_secs: 20
close_min_secs: 20
previous_target_ttl_secs: 86400
- id: window_setback_kueche
enabled: false
name: Fensterabsenkung Küche
type: window_setback@1.0
objects:
contacts:
- kontakt_kueche_garten_fenster
- kontakt_kueche_garten_tuer
- kontakt_kueche_strasse_links
- kontakt_kueche_strasse_rechts
thermostats:
- thermostat_kueche
params:
eco_target: 16.0
open_min_secs: 20
close_min_secs: 20
previous_target_ttl_secs: 86400
- id: window_setback_patty
enabled: false
name: Fensterabsenkung Arbeitszimmer Patty
type: window_setback@1.0
objects:
contacts:
- kontakt_patty_garten_links
- kontakt_patty_garten_rechts
- kontakt_patty_strasse
thermostats:
- thermostat_patty
params:
eco_target: 16.0
open_min_secs: 20
close_min_secs: 20
previous_target_ttl_secs: 86400
- id: window_setback_schlafzimmer
enabled: false
name: Fensterabsenkung Schlafzimmer
type: window_setback@1.0
objects:
contacts:
- kontakt_schlafzimmer_strasse
thermostats:
- thermostat_schlafzimmer
params:
eco_target: 16.0
open_min_secs: 20
close_min_secs: 20
previous_target_ttl_secs: 86400
- id: window_setback_wohnzimmer
enabled: false
name: Fensterabsenkung Wohnzimmer
type: window_setback@1.0
objects:
contacts:
- kontakt_wohnzimmer_garten_links
- kontakt_wohnzimmer_garten_rechts
thermostats:
- thermostat_wohnzimmer
params:
eco_target: 16.0
open_min_secs: 20
close_min_secs: 20
previous_target_ttl_secs: 86400
- id: window_setback_wolfgang
enabled: true
name: Fensterabsenkung Arbeitszimmer Wolfgang
type: window_setback@1.0
objects:
contacts:
- kontakt_wolfgang_garten
thermostats:
- thermostat_wolfgang
params:
eco_target: 16.0
open_min_secs: 20
close_min_secs: 20

24
config/scenes.yaml Normal file
View File

@@ -0,0 +1,24 @@
version: 1
scenes:
- id: "alles_aus"
name: "Alles aus"
steps:
- selector: { type: "light" }
action:
type: "light"
payload: { power: "off" }
- selector: { type: "relay" }
action:
type: "relay"
payload: { power: "off" }
- id: "kueche_gemuetlich"
name: "Küche gemütlich"
steps:
- group_id: "kueche_lichter"
action:
type: "light"
payload:
power: "on"
brightness: 35

62
docker-compose.yaml Normal file
View File

@@ -0,0 +1,62 @@
version: "3.9"
x-environment: &default-env
MQTT_BROKER: "172.23.1.102"
MQTT_PORT: 1883
REDIS_HOST: "172.23.1.116"
REDIS_PORT: 6379
REDIS_DB: 8
services:
ui:
build:
context: .
dockerfile: apps/ui/Dockerfile
container_name: ui
environment:
UI_PORT: 8002
API_BASE: "http://172.19.1.11:8001"
BASE_PATH: "/"
ports:
- "8002:8002"
depends_on:
- api
api:
build:
context: .
dockerfile: apps/api/Dockerfile
container_name: api
environment:
<<: *default-env
REDIS_CHANNEL: "ui:updates"
volumes:
- ./config:/app/config:ro
ports:
- "8001:8001"
depends_on:
- abstraction
abstraction:
build:
context: .
dockerfile: apps/abstraction/Dockerfile
container_name: abstraction
environment:
<<: *default-env
volumes:
- ./config:/app/config:ro
rules:
build:
context: .
dockerfile: apps/rules/Dockerfile
container_name: rules
environment:
<<: *default-env
RULES_CONFIG: "/app/config/rules.yaml"
volumes:
- ./config:/app/config:ro
depends_on:
- abstraction

View File

@@ -1,15 +0,0 @@
version: '3.8'
services:
# Placeholder for future services
# Example:
# api:
# build:
# context: ..
# dockerfile: apps/api/Dockerfile
# ports:
# - "8000:8000"
placeholder:
image: alpine:latest
command: echo "Docker Compose placeholder - add your services here"

View File

@@ -8,7 +8,27 @@ from packages.home_capabilities.contact_sensor import CAP_VERSION as CONTACT_SEN
from packages.home_capabilities.contact_sensor import ContactState
from packages.home_capabilities.temp_humidity_sensor import CAP_VERSION as TEMP_HUMIDITY_SENSOR_VERSION
from packages.home_capabilities.temp_humidity_sensor import TempHumidityState
from packages.home_capabilities.layout import DeviceTile, Room, UiLayout, load_layout
from packages.home_capabilities.relay import CAP_VERSION as RELAY_VERSION
from packages.home_capabilities.relay import RelayState
from packages.home_capabilities.layout import (
DeviceTile,
Room,
UiLayout,
load_layout,
)
from packages.home_capabilities.groups_scenes import (
GroupConfig,
GroupsConfigRoot,
GroupSelector,
SceneConfig,
ScenesConfigRoot,
SceneSelector,
SceneStep,
get_group_by_id,
get_scene_by_id,
load_groups,
load_scenes,
)
__all__ = [
"LightState",
@@ -19,8 +39,21 @@ __all__ = [
"CONTACT_SENSOR_VERSION",
"TempHumidityState",
"TEMP_HUMIDITY_SENSOR_VERSION",
"RelayState",
"RELAY_VERSION",
"DeviceTile",
"Room",
"UiLayout",
"load_layout",
"GroupConfig",
"GroupsConfigRoot",
"GroupSelector",
"SceneConfig",
"ScenesConfigRoot",
"SceneSelector",
"SceneStep",
"get_group_by_id",
"get_scene_by_id",
"load_groups",
"load_scenes",
]

View File

@@ -0,0 +1,229 @@
"""
Configuration models and loaders for groups and scenes.
This module provides Pydantic models for validating groups.yaml and scenes.yaml,
along with loader functions that parse YAML files into typed configuration objects.
"""
from pathlib import Path
from typing import Any
import yaml
from pydantic import BaseModel, Field, field_validator, model_validator
# ============================================================================
# GROUP MODELS
# ============================================================================
class GroupSelector(BaseModel):
"""Selector for automatically adding devices to a group."""
type: str = Field(..., description="Device type (e.g., 'light', 'thermostat')")
room: str | None = Field(None, description="Filter by room name")
tags: list[str] | None = Field(None, description="Filter by device tags")
class GroupConfig(BaseModel):
"""Configuration for a device group."""
id: str = Field(..., description="Unique group identifier")
name: str = Field(..., description="Human-readable group name")
selector: GroupSelector | None = Field(None, description="Auto-select devices by criteria")
device_ids: list[str] = Field(default_factory=list, description="Explicit device IDs")
capabilities: dict[str, bool] = Field(
default_factory=dict,
description="Supported capabilities (e.g., {'brightness': True})"
)
class GroupsConfigRoot(BaseModel):
"""Root configuration for groups.yaml."""
version: int = Field(..., description="Configuration schema version")
groups: list[GroupConfig] = Field(default_factory=list, description="List of groups")
@field_validator('groups')
@classmethod
def validate_unique_ids(cls, groups: list[GroupConfig]) -> list[GroupConfig]:
"""Ensure all group IDs are unique."""
ids = [g.id for g in groups]
duplicates = [id for id in ids if ids.count(id) > 1]
if duplicates:
raise ValueError(f"Duplicate group IDs found: {set(duplicates)}")
return groups
# ============================================================================
# SCENE MODELS
# ============================================================================
class SceneSelector(BaseModel):
"""Selector for targeting devices in a scene step."""
type: str | None = Field(None, description="Device type (e.g., 'light', 'outlet')")
room: str | None = Field(None, description="Filter by room name")
tags: list[str] | None = Field(None, description="Filter by device tags")
class SceneStep(BaseModel):
"""A single step in a scene execution."""
selector: SceneSelector | None = Field(None, description="Select devices by criteria")
group_id: str | None = Field(None, description="Target a specific group")
action: dict[str, Any] = Field(..., description="Action to execute (type + payload)")
delay_ms: int | None = Field(None, description="Delay before next step (milliseconds)")
@model_validator(mode='after')
def validate_selector_or_group(self) -> 'SceneStep':
"""Ensure either selector OR group_id is specified, but not both."""
has_selector = self.selector is not None
has_group = self.group_id is not None
if not has_selector and not has_group:
raise ValueError("SceneStep must have either 'selector' or 'group_id'")
if has_selector and has_group:
raise ValueError("SceneStep cannot have both 'selector' and 'group_id'")
return self
class SceneConfig(BaseModel):
"""Configuration for a scene."""
id: str = Field(..., description="Unique scene identifier")
name: str = Field(..., description="Human-readable scene name")
steps: list[SceneStep] = Field(..., description="Ordered list of actions")
class ScenesConfigRoot(BaseModel):
"""Root configuration for scenes.yaml."""
version: int = Field(..., description="Configuration schema version")
scenes: list[SceneConfig] = Field(default_factory=list, description="List of scenes")
@field_validator('scenes')
@classmethod
def validate_unique_ids(cls, scenes: list[SceneConfig]) -> list[SceneConfig]:
"""Ensure all scene IDs are unique."""
ids = [s.id for s in scenes]
duplicates = [id for id in ids if ids.count(id) > 1]
if duplicates:
raise ValueError(f"Duplicate scene IDs found: {set(duplicates)}")
return scenes
# ============================================================================
# LOADER FUNCTIONS
# ============================================================================
def load_groups(path: Path | str) -> GroupsConfigRoot:
"""
Load and validate groups configuration from YAML file.
Args:
path: Path to groups.yaml file
Returns:
Validated GroupsConfigRoot object
Raises:
FileNotFoundError: If config file doesn't exist
ValidationError: If configuration is invalid
ValueError: If duplicate group IDs are found or YAML is empty
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Groups config file not found: {path}")
with open(path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data is None:
raise ValueError(f"Groups config file is empty: {path}")
return GroupsConfigRoot.model_validate(data)
def load_scenes(path: Path | str) -> ScenesConfigRoot:
"""
Load and validate scenes configuration from YAML file.
Args:
path: Path to scenes.yaml file
Returns:
Validated ScenesConfigRoot object
Raises:
FileNotFoundError: If config file doesn't exist
ValidationError: If configuration is invalid
ValueError: If duplicate scene IDs, invalid steps, or empty YAML are found
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Scenes config file not found: {path}")
with open(path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data is None:
raise ValueError(f"Scenes config file is empty: {path}")
return ScenesConfigRoot.model_validate(data)
# ============================================================================
# CONVENIENCE FUNCTIONS
# ============================================================================
def get_group_by_id(config: GroupsConfigRoot, group_id: str) -> GroupConfig | None:
"""Find a group by its ID."""
for group in config.groups:
if group.id == group_id:
return group
return None
def get_scene_by_id(config: ScenesConfigRoot, scene_id: str) -> SceneConfig | None:
"""Find a scene by its ID."""
for scene in config.scenes:
if scene.id == scene_id:
return scene
return None
# ============================================================================
# EXAMPLE USAGE
# ============================================================================
if __name__ == "__main__":
from pathlib import Path
# Example: Load groups configuration
try:
groups_path = Path(__file__).parent.parent / "config" / "groups.yaml"
groups = load_groups(groups_path)
print(f"✓ Loaded {len(groups.groups)} groups (version {groups.version})")
for group in groups.groups:
print(f" - {group.id}: {group.name}")
if group.selector:
print(f" Selector: type={group.selector.type}, room={group.selector.room}")
if group.device_ids:
print(f" Devices: {', '.join(group.device_ids)}")
except Exception as e:
print(f"✗ Error loading groups: {e}")
print()
# Example: Load scenes configuration
try:
scenes_path = Path(__file__).parent.parent / "config" / "scenes.yaml"
scenes = load_scenes(scenes_path)
print(f"✓ Loaded {len(scenes.scenes)} scenes (version {scenes.version})")
for scene in scenes.scenes:
print(f" - {scene.id}: {scene.name} ({len(scene.steps)} steps)")
for i, step in enumerate(scene.steps, 1):
if step.selector:
print(f" Step {i}: selector type={step.selector.type}")
elif step.group_id:
print(f" Step {i}: group_id={step.group_id}")
print(f" Action: {step.action}")
except Exception as e:
print(f"✗ Error loading scenes: {e}")

View File

@@ -0,0 +1,21 @@
"""
Relay capability model.
A relay is essentially a simple on/off switch, like a light with only power control.
"""
from pydantic import BaseModel, Field
from typing import Literal
# Capability version
CAP_VERSION = "relay@1.0.0"
DISPLAY_NAME = "Relay"
class RelayState(BaseModel):
"""State model for relay devices (on/off only)"""
power: Literal["on", "off"] = Field(..., description="Power state: on or off")
class RelaySetPayload(BaseModel):
"""Payload for setting relay state"""
power: Literal["on", "off"] = Field(..., description="Desired power state: on or off")