17 Commits

Author SHA1 Message Date
2e24c259cb fix 4 2025-11-17 08:42:26 +01:00
bbf280bdf4 fix 3 2025-11-17 08:39:50 +01:00
a7d778b211 fix 2 2025-11-17 08:37:02 +01:00
a7d8afc98b fix 2025-11-17 08:10:11 +01:00
a4ae8a2f6c slider for thermostats 2025-11-17 08:05:58 +01:00
6152385339 fix 8 2025-11-14 15:14:48 +01:00
c2b7328219 fix 7 2025-11-14 15:13:37 +01:00
99362b346f fix 6 2025-11-14 15:01:49 +01:00
77d29c3a42 fix 5 2025-11-14 14:31:03 +01:00
ef3b1177d2 fix 4 2025-11-14 14:18:59 +01:00
8bbe9c164f fix 3 2025-11-14 14:14:49 +01:00
65f8a0c7cb fix 2 2025-11-14 11:34:32 +01:00
cbe7e11cf2 fix 2025-11-14 11:30:10 +01:00
9bf336fa11 groups and scenes 3 2025-11-13 21:56:13 +01:00
b82217a666 groups and scenes 2 2025-11-13 21:54:09 +01:00
5851414ba5 groups and scenes initial 2025-11-13 21:29:04 +01:00
4c5475e930 favicon 2025-11-13 11:14:43 +01:00
11 changed files with 1901 additions and 31 deletions

View File

@@ -25,11 +25,27 @@ from packages.home_capabilities import (
ThermostatState,
ContactState,
TempHumidityState,
RelayState
RelayState,
load_layout,
)
# Import resolvers (must be before router imports to avoid circular dependency)
from apps.api.resolvers import (
DeviceDTO,
resolve_group_devices,
resolve_scene_step_devices,
load_device_rooms,
get_room,
clear_room_cache,
)
logger = logging.getLogger(__name__)
# ============================================================================
# STATE CACHES
# ============================================================================
# In-memory cache for last known device states
# Will be populated from Redis pub/sub messages
device_states: dict[str, dict[str, Any]] = {}
@@ -57,6 +73,13 @@ app.add_middleware(
)
@app.on_event("startup")
async def startup_event():
"""Include routers after app is initialized to avoid circular imports."""
from apps.api.routes.groups_scenes import router as groups_scenes_router
app.include_router(groups_scenes_router, prefix="")
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint.
@@ -207,6 +230,50 @@ def get_mqtt_settings() -> tuple[str, int]:
return host, port
# ============================================================================
# MQTT PUBLISH
# ============================================================================
async def publish_abstract_set(device_type: str, device_id: str, payload: dict[str, Any]) -> None:
"""
Publish an abstract set command via MQTT.
This function encapsulates MQTT publishing logic so that group/scene
execution doesn't need to know MQTT topic details.
Topic format: home/{device_type}/{device_id}/set
Message format: {"type": device_type, "payload": payload}
Args:
device_type: Device type (light, thermostat, relay, etc.)
device_id: Device identifier
payload: Command payload (e.g., {"power": "on", "brightness": 50})
Example:
>>> await publish_abstract_set("light", "kueche_deckenlampe", {"power": "on", "brightness": 35})
# Publishes to: home/light/kueche_deckenlampe/set
# Message: {"type": "light", "payload": {"power": "on", "brightness": 35}}
"""
mqtt_host, mqtt_port = get_mqtt_settings()
topic = f"home/{device_type}/{device_id}/set"
message = {
"type": device_type,
"payload": payload
}
try:
async with Client(hostname=mqtt_host, port=mqtt_port) as client:
await client.publish(
topic=topic,
payload=json.dumps(message),
qos=1
)
logger.info(f"Published to {topic}: {message}")
except Exception as e:
logger.error(f"Failed to publish to {topic}: {e}")
raise
def get_redis_settings() -> tuple[str, str]:
"""Get Redis settings from configuration.
@@ -291,8 +358,6 @@ async def get_layout() -> dict[str, Any]:
Returns:
dict: Layout configuration with rooms and device tiles
"""
from packages.home_capabilities import load_layout
try:
layout = load_layout()
@@ -321,6 +386,23 @@ async def get_layout() -> dict[str, Any]:
return {"rooms": []}
@app.get("/devices/{device_id}/room")
async def get_device_room(device_id: str) -> dict[str, str | None]:
"""Get the room name for a specific device.
Args:
device_id: Device identifier
Returns:
dict: {"device_id": str, "room": str | null}
"""
room = get_room(device_id)
return {
"device_id": device_id,
"room": room
}
@app.post("/devices/{device_id}/set", status_code=status.HTTP_202_ACCEPTED)
async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str]:
"""Set device state.

286
apps/api/resolvers.py Normal file
View File

@@ -0,0 +1,286 @@
"""Group and scene resolution logic."""
import logging
from pathlib import Path
from typing import Any, TypedDict
from packages.home_capabilities import (
GroupConfig,
GroupsConfigRoot,
SceneStep,
get_group_by_id,
load_layout,
)
logger = logging.getLogger(__name__)
# ============================================================================
# TYPE DEFINITIONS
# ============================================================================
class DeviceDTO(TypedDict, total=False):
"""Device Data Transfer Object.
Represents a device as returned by /devices endpoint or load_devices().
Required fields:
device_id: Unique device identifier
type: Device type (light, thermostat, relay, etc.)
Optional fields:
name: Human-readable device name
features: Device capabilities (power, brightness, etc.)
technology: MQTT, zigbee2mqtt, simulator, etc.
topics: MQTT topic configuration
metadata: Additional device information
"""
device_id: str
type: str
name: str
features: dict[str, Any]
technology: str
topics: dict[str, str]
metadata: dict[str, Any]
# ============================================================================
# DEVICE-ROOM MAPPING
# ============================================================================
# Global cache for device -> room mapping
_device_room_cache: dict[str, str] = {}
def load_device_rooms(path: str | Path | None = None) -> dict[str, str]:
"""
Load device-to-room mapping from layout configuration.
This function extracts a mapping of device_id -> room_name from the layout.yaml
file, which is useful for resolving selectors like {room: "Küche"}.
Args:
path: Optional path to layout.yaml. If None, uses default path
(config/layout.yaml relative to workspace root)
Returns:
Dictionary mapping device_id to room_name. Returns empty dict if:
- layout.yaml doesn't exist
- layout.yaml is malformed
- layout.yaml is empty
Example:
>>> mapping = load_device_rooms()
>>> mapping['kueche_lampe1']
'Küche'
"""
global _device_room_cache
try:
# Load the layout using existing function
layout = load_layout(path)
# Build device -> room mapping
device_rooms: dict[str, str] = {}
for room in layout.rooms:
for device in room.devices:
device_rooms[device.device_id] = room.name
# Update global cache
_device_room_cache = device_rooms.copy()
logger.info(f"Loaded device-room mapping: {len(device_rooms)} devices")
return device_rooms
except (FileNotFoundError, ValueError, Exception) as e:
logger.warning(f"Failed to load device-room mapping: {e}")
logger.warning("Returning empty device-room mapping")
_device_room_cache = {}
return {}
def get_room(device_id: str) -> str | None:
"""
Get the room name for a given device ID.
This function uses the cached device-room mapping loaded by load_device_rooms().
If the cache is empty, it will attempt to load it first.
Args:
device_id: The device identifier to lookup
Returns:
Room name if device is found, None otherwise
Example:
>>> get_room('kueche_lampe1')
'Küche'
>>> get_room('nonexistent_device')
None
"""
# Check if cache is populated
if not _device_room_cache:
logger.debug("Device-room cache empty, loading from layout...")
# Load mapping (this updates the global _device_room_cache)
load_device_rooms()
# Access the cache after potential reload
return _device_room_cache.get(device_id)
def clear_room_cache() -> None:
"""
Clear the cached device-room mapping.
This is useful for testing or when the layout configuration has changed
and needs to be reloaded.
"""
_device_room_cache.clear()
logger.debug("Cleared device-room cache")
# ============================================================================
# GROUP & SCENE RESOLUTION
# ============================================================================
def resolve_group_devices(
group: GroupConfig,
devices: list[DeviceDTO],
device_rooms: dict[str, str]
) -> list[DeviceDTO]:
"""
Resolve devices for a group based on device_ids or selector.
Args:
group: Group configuration with device_ids or selector
devices: List of all available devices
device_rooms: Mapping of device_id -> room_name
Returns:
List of devices matching the group criteria (no duplicates)
Example:
>>> # Group with explicit device_ids
>>> group = GroupConfig(id="test", name="Test", device_ids=["lamp1", "lamp2"])
>>> resolve_group_devices(group, all_devices, {})
[{"device_id": "lamp1", ...}, {"device_id": "lamp2", ...}]
>>> # Group with selector (all lights in kitchen)
>>> group = GroupConfig(
... id="kitchen_lights",
... name="Kitchen Lights",
... selector=GroupSelector(type="light", room="Küche")
... )
>>> resolve_group_devices(group, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
"""
# Case 1: Explicit device_ids
if group.device_ids:
device_id_set = set(group.device_ids)
return [d for d in devices if d["device_id"] in device_id_set]
# Case 2: Selector-based filtering
if group.selector:
filtered = []
for device in devices:
# Filter by type (required in selector)
if device["type"] != group.selector.type:
continue
# Filter by room (optional)
if group.selector.room:
device_room = device_rooms.get(device["device_id"])
if device_room != group.selector.room:
continue
# Filter by tags (optional, future feature)
# if group.selector.tags:
# device_tags = device.get("metadata", {}).get("tags", [])
# if not any(tag in device_tags for tag in group.selector.tags):
# continue
filtered.append(device)
return filtered
# No device_ids and no selector → empty list
return []
def resolve_scene_step_devices(
step: SceneStep,
groups_config: GroupsConfigRoot,
devices: list[DeviceDTO],
device_rooms: dict[str, str]
) -> list[DeviceDTO]:
"""
Resolve devices for a scene step based on group_id or selector.
Args:
step: Scene step with group_id or selector
groups_config: Groups configuration for group lookup
devices: List of all available devices
device_rooms: Mapping of device_id -> room_name
Returns:
List of devices matching the step criteria
Raises:
ValueError: If group_id is specified but group not found
Example:
>>> # Step with group_id
>>> step = SceneStep(group_id="kitchen_lights", action={...})
>>> resolve_scene_step_devices(step, groups_cfg, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
>>> # Step with selector
>>> step = SceneStep(
... selector=SceneSelector(type="light", room="Küche"),
... action={...}
... )
>>> resolve_scene_step_devices(step, groups_cfg, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
"""
# Case 1: Group reference
if step.group_id:
# Look up the group
group = get_group_by_id(groups_config, step.group_id)
if not group:
raise ValueError(
f"Scene step references unknown group_id: '{step.group_id}'. "
f"Available groups: {[g.id for g in groups_config.groups]}"
)
# Resolve the group's devices
return resolve_group_devices(group, devices, device_rooms)
# Case 2: Direct selector
if step.selector:
filtered = []
for device in devices:
# Filter by type (optional in scene selector)
if step.selector.type and device["type"] != step.selector.type:
continue
# Filter by room (optional)
if step.selector.room:
device_room = device_rooms.get(device["device_id"])
if device_room != step.selector.room:
continue
# Filter by tags (optional, future feature)
# if step.selector.tags:
# device_tags = device.get("metadata", {}).get("tags", [])
# if not any(tag in device_tags for tag in step.selector.tags):
# continue
filtered.append(device)
return filtered
# Should not reach here due to SceneStep validation (must have group_id or selector)
return []

View File

@@ -0,0 +1 @@
"""API routes package."""

View File

@@ -0,0 +1,454 @@
"""Groups and Scenes API routes."""
import asyncio
import logging
from pathlib import Path
from typing import Any
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from packages.home_capabilities import (
GroupConfig,
GroupsConfigRoot,
SceneConfig,
ScenesConfigRoot,
get_group_by_id,
get_scene_by_id,
load_groups,
load_scenes,
)
# Import from parent modules
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from resolvers import (
DeviceDTO,
resolve_group_devices,
resolve_scene_step_devices,
load_device_rooms,
)
from main import load_devices, publish_abstract_set
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================
class GroupResponse(BaseModel):
"""Response model for a group."""
id: str
name: str
device_count: int
devices: list[str]
selector: dict[str, Any] | None = None
capabilities: dict[str, bool]
class GroupSetRequest(BaseModel):
"""Request to set state for all devices in a group."""
action: dict[str, Any] # e.g., {"type": "light", "payload": {"power": "on", "brightness": 50}}
class SceneResponse(BaseModel):
"""Response model for a scene."""
id: str
name: str
steps: int
class SceneRunRequest(BaseModel):
"""Request to execute a scene (currently empty, future: override params)."""
pass
class SceneExecutionResponse(BaseModel):
"""Response after scene execution."""
scene_id: str
scene_name: str
steps_executed: int
devices_affected: int
execution_plan: list[dict[str, Any]]
# ============================================================================
# GROUPS ENDPOINTS
# ============================================================================
@router.get("/groups", response_model=list[GroupResponse], tags=["groups"])
async def list_groups() -> list[GroupResponse]:
"""
List all available groups.
Returns:
list[GroupResponse]: List of groups with their devices
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Build response for each group
response = []
for group in groups_config.groups:
# Resolve devices for this group
resolved_devices = resolve_group_devices(group, devices, device_rooms)
device_ids = [d["device_id"] for d in resolved_devices]
# Convert selector to dict if present
selector_dict = None
if group.selector:
selector_dict = {
"type": group.selector.type,
"room": group.selector.room,
"tags": group.selector.tags,
}
response.append(GroupResponse(
id=group.id,
name=group.name,
device_count=len(device_ids),
devices=device_ids,
selector=selector_dict,
capabilities=group.capabilities,
))
return response
except Exception as e:
logger.error(f"Error loading groups: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to load groups: {str(e)}"
)
@router.get("/groups/{group_id}", response_model=GroupResponse, tags=["groups"])
async def get_group(group_id: str) -> GroupResponse:
"""
Get details for a specific group.
Args:
group_id: Group identifier
Returns:
GroupResponse: Group details with resolved devices
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the group
group = get_group_by_id(groups_config, group_id)
if not group:
available_groups = [g.id for g in groups_config.groups]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Group '{group_id}' not found. Available groups: {available_groups}"
)
# Resolve devices
resolved_devices = resolve_group_devices(group, devices, device_rooms)
device_ids = [d["device_id"] for d in resolved_devices]
# Convert selector to dict if present
selector_dict = None
if group.selector:
selector_dict = {
"type": group.selector.type,
"room": group.selector.room,
"tags": group.selector.tags,
}
return GroupResponse(
id=group.id,
name=group.name,
device_count=len(device_ids),
devices=device_ids,
selector=selector_dict,
capabilities=group.capabilities,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting group {group_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get group: {str(e)}"
)
@router.post("/groups/{group_id}/set", status_code=status.HTTP_202_ACCEPTED, tags=["groups"])
async def set_group(group_id: str, request: GroupSetRequest) -> dict[str, Any]:
"""
Set state for all devices in a group.
This endpoint resolves the group to its devices and would send
the action to each device. Currently returns execution plan.
Args:
group_id: Group identifier
request: Action to apply to all devices in the group
Returns:
dict: Execution plan (devices and actions to be executed)
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the group
group = get_group_by_id(groups_config, group_id)
if not group:
available_groups = [g.id for g in groups_config.groups]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Group '{group_id}' not found. Available groups: {available_groups}"
)
# Resolve devices
resolved_devices = resolve_group_devices(group, devices, device_rooms)
if not resolved_devices:
logger.warning(f"Group '{group_id}' resolved to 0 devices")
# Execute actions via MQTT
execution_plan = []
for device in resolved_devices:
device_type = device["type"]
device_id = device["device_id"]
payload = request.action.get("payload", {})
# Publish MQTT command
try:
await publish_abstract_set(device_type, device_id, payload)
execution_plan.append({
"device_id": device_id,
"device_type": device_type,
"action": request.action,
"status": "published"
})
except Exception as e:
logger.error(f"Failed to publish to {device_id}: {e}")
execution_plan.append({
"device_id": device_id,
"device_type": device_type,
"action": request.action,
"status": "failed",
"error": str(e)
})
return {
"group_id": group_id,
"group_name": group.name,
"devices_affected": len(resolved_devices),
"execution_plan": execution_plan,
"status": "executed"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error setting group {group_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to set group: {str(e)}"
)
# ============================================================================
# SCENES ENDPOINTS
# ============================================================================
@router.get("/scenes", response_model=list[SceneResponse], tags=["scenes"])
async def list_scenes() -> list[SceneResponse]:
"""
List all available scenes.
Returns:
list[SceneResponse]: List of scenes
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
# Build response for each scene
response = []
for scene in scenes_config.scenes:
response.append(SceneResponse(
id=scene.id,
name=scene.name,
steps=len(scene.steps),
))
return response
except Exception as e:
logger.error(f"Error loading scenes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to load scenes: {str(e)}"
)
@router.get("/scenes/{scene_id}", response_model=SceneResponse, tags=["scenes"])
async def get_scene(scene_id: str) -> SceneResponse:
"""
Get details for a specific scene.
Args:
scene_id: Scene identifier
Returns:
SceneResponse: Scene details
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
# Find the scene
scene = get_scene_by_id(scenes_config, scene_id)
if not scene:
available_scenes = [s.id for s in scenes_config.scenes]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scene '{scene_id}' not found. Available scenes: {available_scenes}"
)
return SceneResponse(
id=scene.id,
name=scene.name,
steps=len(scene.steps),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting scene {scene_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get scene: {str(e)}"
)
@router.post("/scenes/{scene_id}/run", response_model=SceneExecutionResponse, tags=["scenes"])
async def run_scene(scene_id: str, request: SceneRunRequest | None = None) -> SceneExecutionResponse:
"""
Execute a scene.
This endpoint resolves each step in the scene to its target devices
and would execute the actions. Currently returns execution plan.
Args:
scene_id: Scene identifier
request: Optional execution parameters (reserved for future use)
Returns:
SceneExecutionResponse: Execution plan and summary
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the scene
scene = get_scene_by_id(scenes_config, scene_id)
if not scene:
available_scenes = [s.id for s in scenes_config.scenes]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scene '{scene_id}' not found. Available scenes: {available_scenes}"
)
# Execute scene steps
execution_plan = []
total_devices = 0
for i, step in enumerate(scene.steps, 1):
# Resolve devices for this step
resolved_devices = resolve_scene_step_devices(step, groups_config, devices, device_rooms)
total_devices += len(resolved_devices)
# Extract action payload
action_payload = step.action.get("payload", {})
# Execute for each device
step_executions = []
for device in resolved_devices:
device_type = device["type"]
device_id = device["device_id"]
try:
await publish_abstract_set(device_type, device_id, action_payload)
step_executions.append({
"device_id": device_id,
"status": "published"
})
except Exception as e:
logger.error(f"Failed to publish to {device_id} in step {i}: {e}")
step_executions.append({
"device_id": device_id,
"status": "failed",
"error": str(e)
})
# Build step info
step_info = {
"step": i,
"devices_affected": len(resolved_devices),
"device_ids": [d["device_id"] for d in resolved_devices],
"action": step.action,
"executions": step_executions,
}
# Add targeting info
if step.group_id:
step_info["target"] = {"type": "group_id", "value": step.group_id}
elif step.selector:
step_info["target"] = {
"type": "selector",
"selector_type": step.selector.type,
"room": step.selector.room,
}
if step.delay_ms:
step_info["delay_ms"] = step.delay_ms
# Apply delay before next step
await asyncio.sleep(step.delay_ms / 1000.0)
execution_plan.append(step_info)
return SceneExecutionResponse(
scene_id=scene.id,
scene_name=scene.name,
steps_executed=len(scene.steps),
devices_affected=total_devices,
execution_plan=execution_plan,
)
except HTTPException:
raise
except ValueError as e:
# Handle unknown group_id in scene step
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Error running scene {scene_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to run scene: {str(e)}"
)

View File

@@ -246,6 +246,23 @@ class RedisState:
await self._execute_with_retry(_expire, key, ttl_secs)
async def delete(self, key: str) -> None:
"""
Delete a key from Redis.
Args:
key: Redis key to delete
Example:
>>> state = RedisState("redis://localhost:6379/0")
>>> await state.set("rules:r1:temp", "22.5")
>>> await state.delete("rules:r1:temp")
"""
async def _delete(client, k):
await client.delete(k)
await self._execute_with_retry(_delete, key)
async def close(self) -> None:
"""
Close Redis connection and cleanup resources.

View File

@@ -0,0 +1,13 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 100 100">
<!-- Roof -->
<path d="M50 10 L90 45 L85 45 L85 50 L15 50 L15 45 L10 45 Z" fill="#667eea" stroke="#4c51bf" stroke-width="2" stroke-linejoin="round"/>
<!-- House body -->
<rect x="15" y="45" width="70" height="45" fill="#764ba2" stroke="#4c51bf" stroke-width="2"/>
<!-- Door -->
<rect x="35" y="60" width="15" height="30" fill="#4c51bf" rx="2"/>
<!-- Window -->
<rect x="60" y="60" width="20" height="15" fill="#fbbf24" stroke="#f59e0b" stroke-width="1"/>
<!-- Window panes -->
<line x1="70" y1="60" x2="70" y2="75" stroke="#f59e0b" stroke-width="1"/>
<line x1="60" y1="67.5" x2="80" y2="67.5" stroke="#f59e0b" stroke-width="1"/>
</svg>

After

Width:  |  Height:  |  Size: 721 B

View File

@@ -4,6 +4,7 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Home Automation</title>
<link rel="icon" type="image/svg+xml" href="/static/favicon.svg">
<style>
* {
margin: 0;
@@ -165,6 +166,12 @@
margin: 0;
}
.room-title h2 {
font-size: 1.5rem;
font-weight: 700;
margin: 0;
}
.room-toggle {
font-size: 1.5rem;
color: #667eea;
@@ -358,32 +365,57 @@
color: #999;
}
.temp-controls {
display: flex;
gap: 0.5rem;
margin-bottom: 1rem;
/* Thermostat Slider Styles */
.thermostat-slider-control {
margin: 1rem 0;
}
.temp-button {
flex: 1;
padding: 0.75rem;
border: none;
border-radius: 8px;
font-size: 1.125rem;
font-weight: 700;
.thermostat-slider-label {
font-size: 0.875rem;
color: #666;
display: block;
margin-bottom: 0.5rem;
}
.thermostat-slider {
width: 100%;
height: 8px;
border-radius: 4px;
background: linear-gradient(to right, #667eea 0%, #764ba2 100%);
outline: none;
-webkit-appearance: none;
appearance: none;
cursor: pointer;
transition: all 0.2s;
background: #667eea;
color: white;
min-height: 44px;
}
.temp-button:hover {
background: #5568d3;
.thermostat-slider::-webkit-slider-thumb {
-webkit-appearance: none;
appearance: none;
width: 24px;
height: 24px;
border-radius: 50%;
background: white;
border: 3px solid #667eea;
cursor: pointer;
box-shadow: 0 2px 4px rgba(0,0,0,0.2);
}
.temp-button:active {
transform: scale(0.95);
.thermostat-slider::-moz-range-thumb {
width: 24px;
height: 24px;
border-radius: 50%;
background: white;
border: 3px solid #667eea;
cursor: pointer;
box-shadow: 0 2px 4px rgba(0,0,0,0.2);
}
.thermostat-slider-range {
display: flex;
justify-content: space-between;
margin-top: 0.25rem;
font-size: 0.75rem;
color: #999;
}
/* Contact Sensor Styles */
@@ -391,6 +423,7 @@
display: flex;
align-items: center;
gap: 0.75rem;
padding: 1rem;
background: #f8f9fa;
border-radius: 8px;
@@ -478,6 +511,208 @@
text-align: center;
}
/* Groups Section Styles */
.groups-section .devices {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(280px, 1fr));
gap: 1rem;
}
.group-card {
background: white;
border-radius: 12px;
padding: 1.25rem;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.08);
transition: all 0.3s;
}
.group-card:hover {
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.12);
transform: translateY(-2px);
}
.group-card-header {
margin-bottom: 1rem;
}
.group-card-title {
font-size: 1.125rem;
font-weight: 600;
color: #333;
margin-bottom: 0.25rem;
}
.group-card-subtitle {
font-size: 0.875rem;
color: #666;
}
.group-card-actions {
display: flex;
gap: 0.5rem;
}
.group-button {
flex: 1;
padding: 0.75rem;
border: none;
border-radius: 8px;
font-size: 0.875rem;
font-weight: 600;
cursor: pointer;
transition: all 0.2s;
min-height: 44px;
position: relative;
}
.group-button.on {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
.group-button.on:hover {
background: linear-gradient(135deg, #5568d3 0%, #653a8e 100%);
}
.group-button.off {
background: #f1f3f5;
color: #495057;
}
.group-button.off:hover {
background: #e9ecef;
}
.group-button:active {
transform: scale(0.95);
}
.group-button:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.group-button .spinner {
display: inline-block;
width: 14px;
height: 14px;
border: 2px solid rgba(255, 255, 255, 0.3);
border-top-color: white;
border-radius: 50%;
animation: spin 0.6s linear infinite;
}
/* Scenes Section Styles */
.scenes-section .devices {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(200px, 1fr));
gap: 1rem;
}
.scene-button {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
color: white;
border: none;
border-radius: 12px;
padding: 1.25rem;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: all 0.3s;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
min-height: 80px;
display: flex;
align-items: center;
justify-content: center;
position: relative;
}
.scene-button:hover {
background: linear-gradient(135deg, #e082ea 0%, #e4465b 100%);
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15);
transform: translateY(-2px);
}
.scene-button:active {
transform: translateY(0) scale(0.95);
}
.scene-button:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.scene-button .spinner {
display: inline-block;
width: 16px;
height: 16px;
border: 2px solid rgba(255, 255, 255, 0.3);
border-top-color: white;
border-radius: 50%;
animation: spin 0.6s linear infinite;
margin-left: 0.5rem;
}
@keyframes spin {
to { transform: rotate(360deg); }
}
/* Toast Notification */
.toast {
position: fixed;
bottom: 2rem;
right: 2rem;
background: white;
padding: 1rem 1.5rem;
border-radius: 8px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.2);
display: flex;
align-items: center;
gap: 0.75rem;
z-index: 1000;
animation: slideIn 0.3s ease;
max-width: 400px;
}
@keyframes slideIn {
from {
transform: translateX(100%);
opacity: 0;
}
to {
transform: translateX(0);
opacity: 1;
}
}
.toast.success {
border-left: 4px solid #51cf66;
}
.toast.error {
border-left: 4px solid #ff6b6b;
}
.toast-icon {
font-size: 1.5rem;
}
.toast-message {
flex: 1;
color: #333;
font-size: 0.875rem;
}
.toast-close {
background: none;
border: none;
color: #999;
cursor: pointer;
font-size: 1.25rem;
padding: 0;
line-height: 1;
}
.events {
margin-top: 2rem;
background: white;
@@ -534,6 +769,106 @@
font-size: 0.875rem;
color: #999;
}
/* Responsive Design */
@media (max-width: 768px) {
.container {
padding: 1rem;
}
header {
padding: 1rem;
}
.header-content h1 {
font-size: 1.5rem;
}
.header-content p {
font-size: 0.75rem;
}
.header-buttons {
gap: 0.5rem;
}
.refresh-btn, .collapse-all-btn {
width: 36px;
height: 36px;
font-size: 1.25rem;
}
.room {
padding: 1rem;
}
.room-header h2 {
font-size: 1.125rem;
}
.devices {
grid-template-columns: 1fr;
}
/* Groups responsive */
.groups-section .devices {
grid-template-columns: 1fr;
}
.group-card {
padding: 1rem;
}
.group-card-title {
font-size: 1rem;
}
/* Scenes responsive */
.scenes-section .devices {
grid-template-columns: 1fr;
}
.scene-button {
min-height: 60px;
font-size: 0.9375rem;
}
/* Toast responsive */
.toast {
bottom: 1rem;
right: 1rem;
left: 1rem;
max-width: none;
}
/* Thermostat responsive */
.thermostat-display {
grid-template-columns: 1fr;
gap: 0.75rem;
}
.temp-controls {
flex-direction: column;
}
.temp-button {
width: 100%;
}
}
@media (min-width: 769px) and (max-width: 1024px) {
.devices {
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
}
.groups-section .devices {
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
}
.scenes-section .devices {
grid-template-columns: repeat(auto-fill, minmax(180px, 1fr));
}
}
</style>
</head>
<body>
@@ -656,13 +991,23 @@
</div>
</div>
<div class="temp-controls">
<button class="temp-button" onclick="adjustTarget('{{ device.device_id }}', -1.0)">
-1.0
</button>
<button class="temp-button" onclick="adjustTarget('{{ device.device_id }}', 1.0)">
+1.0
</button>
<div class="thermostat-slider-control">
<label for="slider-{{ device.device_id }}" class="thermostat-slider-label">
🎯 Zieltemperatur: <span id="thermostat-slider-value-{{ device.device_id }}">21.0</span>°C
</label>
<input type="range"
min="5"
max="30"
step="0.5"
value="21.0"
class="thermostat-slider"
id="slider-{{ device.device_id }}"
oninput="updateThermostatSliderValue('{{ device.device_id }}', this.value)"
onchange="setThermostatTarget('{{ device.device_id }}', this.value)">
<div class="thermostat-slider-range">
<span>5°C</span>
<span>30°C</span>
</div>
</div>
{% elif device.type == "contact" or device.type == "contact_sensor" %}
@@ -712,6 +1057,38 @@
</div>
{% endif %}
<!-- Groups Section -->
<section class="room groups-section">
<div class="room-header" onclick="toggleRoom('groups-content')">
<div class="room-title">
<h2>🎛️ Gruppen</h2>
<span class="device-count" id="groups-count">Lädt...</span>
</div>
<span class="room-toggle collapsed" id="toggle-groups-content"></span>
</div>
<div class="room-content collapsed" id="groups-content">
<div class="devices" id="groups-container">
<p style="color: #666;">Lade Gruppen...</p>
</div>
</div>
</section>
<!-- Scenes Section -->
<section class="room scenes-section">
<div class="room-header" onclick="toggleRoom('scenes-content')">
<div class="room-title">
<h2>🎬 Szenen</h2>
<span class="device-count" id="scenes-count">Lädt...</span>
</div>
<span class="room-toggle collapsed" id="toggle-scenes-content"></span>
</div>
<div class="room-content collapsed" id="scenes-content">
<div class="devices" id="scenes-container">
<p style="color: #666;">Lade Szenen...</p>
</div>
</div>
</section>
<div class="events">
<h2>📡 Realtime Events</h2>
<div class="event-list" id="event-list">
@@ -767,7 +1144,8 @@
// Set room icons based on room name
document.addEventListener('DOMContentLoaded', () => {
const roomTitles = document.querySelectorAll('.room-title');
// Only select room titles that are <h2> elements (exclude Groups/Scenes sections)
const roomTitles = document.querySelectorAll('.room-title:not(.groups-section .room-title):not(.scenes-section .room-title)');
roomTitles.forEach(title => {
const roomName = title.textContent.trim().toLowerCase();
let icon = '🏠'; // Default
@@ -985,6 +1363,8 @@
function updateThermostatUI(deviceId, current, target, mode) {
const currentSpan = document.getElementById(`state-${deviceId}-current`);
const targetSpan = document.getElementById(`state-${deviceId}-target`);
const slider = document.getElementById(`slider-${deviceId}`);
const sliderValueSpan = document.getElementById(`thermostat-slider-value-${deviceId}`);
if (current !== undefined && currentSpan) {
currentSpan.textContent = current.toFixed(1);
@@ -994,6 +1374,14 @@
if (targetSpan) {
targetSpan.textContent = target.toFixed(1);
}
// Sync slider with actual state
if (slider) {
slider.value = target;
}
// Sync slider value display
if (sliderValueSpan) {
sliderValueSpan.textContent = target.toFixed(1);
}
thermostatTargets[deviceId] = target;
}
}
@@ -1259,6 +1647,317 @@
loadDevices().then(() => {
console.log('Initial states loaded, now connecting SSE...');
});
// ===== GROUPS & SCENES FUNCTIONALITY =====
// Show toast notification
function showToast(message, type = 'success') {
// Remove existing toasts
document.querySelectorAll('.toast').forEach(t => t.remove());
const toast = document.createElement('div');
toast.className = `toast ${type}`;
toast.innerHTML = `
<span class="toast-icon">${type === 'success' ? '✓' : '✗'}</span>
<span class="toast-message">${message}</span>
<button class="toast-close" onclick="this.parentElement.remove()">×</button>
`;
document.body.appendChild(toast);
// Auto-remove after 4 seconds
setTimeout(() => {
toast.style.animation = 'slideIn 0.3s ease reverse';
setTimeout(() => toast.remove(), 300);
}, 4000);
}
// Load and render groups
async function loadGroups() {
try {
const response = await fetch(api('/groups'));
if (!response.ok) throw new Error('Failed to load groups');
const groups = await response.json();
const container = document.getElementById('groups-container');
const countSpan = document.getElementById('groups-count');
if (groups.length === 0) {
container.innerHTML = '<p style="color: #666;">Keine Gruppen konfiguriert.</p>';
countSpan.textContent = '';
return;
}
countSpan.textContent = '';
container.innerHTML = groups.map(group => `
<div class="group-card">
<div class="group-card-header">
<div class="group-card-title">${group.name}</div>
<div class="group-card-subtitle">${group.device_count} ${group.device_count === 1 ? 'Gerät' : 'Geräte'}</div>
</div>
${group.capabilities.brightness ? `
<div class="brightness-control">
<label class="brightness-label">
<span>🔆 Helligkeit</span>
<span class="brightness-value" id="group-brightness-${group.id}">50%</span>
</label>
<input type="range"
min="0"
max="100"
value="50"
class="brightness-slider"
id="slider-group-${group.id}"
oninput="updateGroupBrightnessDisplay('${group.id}', this.value)"
onchange="setGroupBrightness('${group.id}', this.value)">
</div>
` : ''}
<div class="group-card-actions">
<button class="group-button on" onclick="setGroup('${group.id}', 'on', this)">
Alle An
</button>
<button class="group-button off" onclick="setGroup('${group.id}', 'off', this)">
Alle Aus
</button>
</div>
</div>
`).join('');
console.log(`Loaded ${groups.length} groups`);
} catch (error) {
console.error('Failed to load groups:', error);
const container = document.getElementById('groups-container');
container.innerHTML = '<p style="color: #999;">Fehler beim Laden der Gruppen</p>';
showToast('Fehler beim Laden der Gruppen', 'error');
}
}
// Update group brightness display value
function updateGroupBrightnessDisplay(groupId, value) {
const display = document.getElementById(`group-brightness-${groupId}`);
if (display) {
display.textContent = `${value}%`;
}
}
// Set group brightness immediately when slider changes
async function setGroupBrightness(groupId, brightness) {
try {
const response = await fetch(api(`/groups/${groupId}/set`), {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: {
type: 'power',
payload: {
power: 'on',
brightness: parseInt(brightness)
}
}
})
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'Request failed');
}
const result = await response.json();
const publishedCount = result.execution_plan.filter(p => p.status === 'published').length;
console.log(`Group ${groupId} brightness set to ${brightness}%: ${publishedCount} devices`);
addEvent({
action: 'group_brightness',
group_id: groupId,
brightness: brightness,
device_count: publishedCount
});
} catch (error) {
console.error('Failed to set group brightness:', error);
showToast(`Fehler beim Setzen der Helligkeit: ${error.message}`, 'error');
}
}
// Update thermostat slider value display while dragging
function updateThermostatSliderValue(deviceId, value) {
const valueSpan = document.getElementById(`thermostat-slider-value-${deviceId}`);
if (valueSpan) {
valueSpan.textContent = parseFloat(value).toFixed(1);
}
}
// Set thermostat target temperature when slider is released
async function setThermostatTarget(deviceId, value) {
try {
const targetTemp = parseFloat(value);
const response = await fetch(api(`/devices/${deviceId}/set`), {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
type: 'thermostat',
payload: {
target: targetTemp
}
})
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'Request failed');
}
console.log(`Thermostat ${deviceId} target set to ${targetTemp}°C`);
addEvent({
action: 'thermostat_set',
device_id: deviceId,
target_temperature: targetTemp
});
} catch (error) {
console.error('Failed to set thermostat target:', error);
showToast(`Fehler beim Setzen der Zieltemperatur: ${error.message}`, 'error');
}
}
// Execute group action
async function setGroup(groupId, power, buttonElement) {
const allButtons = buttonElement.parentElement.querySelectorAll('button');
allButtons.forEach(btn => btn.disabled = true);
const originalHTML = buttonElement.innerHTML;
buttonElement.innerHTML = '<span class="spinner"></span>';
// Get brightness value if slider exists
const slider = document.getElementById(`slider-group-${groupId}`);
const brightness = slider ? parseInt(slider.value) : null;
try {
const payload = { power };
if (brightness !== null && power === 'on') {
payload.brightness = brightness;
}
const response = await fetch(api(`/groups/${groupId}/set`), {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: {
type: 'power',
payload: payload
}
})
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'Request failed');
}
const result = await response.json();
const publishedCount = result.execution_plan.filter(p => p.status === 'published').length;
showToast(`Gruppe ${power === 'on' ? 'eingeschaltet' : 'ausgeschaltet'}: ${publishedCount} Geräte`, 'success');
console.log(`Group ${groupId} set to ${power}:`, result);
addEvent({
action: 'group_set',
group_id: groupId,
power: power,
device_count: publishedCount
});
} catch (error) {
console.error('Failed to set group:', error);
showToast(`Fehler: ${error.message}`, 'error');
} finally {
buttonElement.innerHTML = originalHTML;
allButtons.forEach(btn => btn.disabled = false);
}
}
// Load and render scenes
async function loadScenes() {
try {
const response = await fetch(api('/scenes'));
if (!response.ok) throw new Error('Failed to load scenes');
const scenes = await response.json();
const container = document.getElementById('scenes-container');
const countSpan = document.getElementById('scenes-count');
if (scenes.length === 0) {
container.innerHTML = '<p style="color: #666;">Keine Szenen konfiguriert.</p>';
countSpan.textContent = '';
return;
}
countSpan.textContent = '';
container.innerHTML = scenes.map(scene => `
<button class="scene-button" onclick="runScene('${scene.id}', this)">
${scene.name}
</button>
`).join('');
console.log(`Loaded ${scenes.length} scenes`);
} catch (error) {
console.error('Failed to load scenes:', error);
const container = document.getElementById('scenes-container');
container.innerHTML = '<p style="color: #999;">Fehler beim Laden der Szenen</p>';
showToast('Fehler beim Laden der Szenen', 'error');
}
}
// Execute scene
async function runScene(sceneId, buttonElement) {
buttonElement.disabled = true;
const originalHTML = buttonElement.innerHTML;
buttonElement.innerHTML = `${originalHTML} <span class="spinner"></span>`;
try {
const response = await fetch(api(`/scenes/${sceneId}/run`), {
method: 'POST',
headers: { 'Content-Type': 'application/json' }
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || 'Request failed');
}
const result = await response.json();
const totalPublished = result.steps.reduce((sum, step) =>
sum + step.devices.filter(d => d.status === 'published').length, 0
);
showToast(`Szene ausgeführt: ${totalPublished} Aktionen`, 'success');
console.log(`Scene ${sceneId} executed:`, result);
addEvent({
action: 'scene_run',
scene_id: sceneId,
steps: result.steps.length,
total_actions: totalPublished
});
} catch (error) {
console.error('Failed to run scene:', error);
showToast(`Fehler: ${error.message}`, 'error');
} finally {
buttonElement.innerHTML = originalHTML;
buttonElement.disabled = false;
}
}
// Load groups and scenes on page load
document.addEventListener('DOMContentLoaded', () => {
loadGroups();
loadScenes();
});
</script>
</body>
</html>

36
config/groups.yaml Normal file
View File

@@ -0,0 +1,36 @@
version: 1
groups:
- id: "kueche_lichter"
name: "Küche alle Lampen"
selector:
type: "light"
room: "Küche"
capabilities:
power: true
brightness: true
- id: "alles_lichter"
name: "Alle Lichter"
selector:
type: "light"
capabilities:
power: true
- id: "schlafzimmer_lichter"
name: "Schlafzimmer alle Lampen"
selector:
type: "light"
room: "Schlafzimmer"
capabilities:
power: true
brightness: true
- id: "schlafzimmer_schlummer_licht"
name: "Schlafzimmer Schlummerlicht"
device_ids:
- bettlicht_patty
- bettlicht_wolfgang
- medusalampe_schlafzimmer
capabilities:
power: true
brightness: true

24
config/scenes.yaml Normal file
View File

@@ -0,0 +1,24 @@
version: 1
scenes:
- id: "alles_aus"
name: "Alles aus"
steps:
- selector: { type: "light" }
action:
type: "light"
payload: { power: "off" }
- selector: { type: "relay" }
action:
type: "relay"
payload: { power: "off" }
- id: "kueche_gemuetlich"
name: "Küche gemütlich"
steps:
- group_id: "kueche_lichter"
action:
type: "light"
payload:
power: "on"
brightness: 35

View File

@@ -10,7 +10,25 @@ from packages.home_capabilities.temp_humidity_sensor import CAP_VERSION as TEMP_
from packages.home_capabilities.temp_humidity_sensor import TempHumidityState
from packages.home_capabilities.relay import CAP_VERSION as RELAY_VERSION
from packages.home_capabilities.relay import RelayState
from packages.home_capabilities.layout import DeviceTile, Room, UiLayout, load_layout
from packages.home_capabilities.layout import (
DeviceTile,
Room,
UiLayout,
load_layout,
)
from packages.home_capabilities.groups_scenes import (
GroupConfig,
GroupsConfigRoot,
GroupSelector,
SceneConfig,
ScenesConfigRoot,
SceneSelector,
SceneStep,
get_group_by_id,
get_scene_by_id,
load_groups,
load_scenes,
)
__all__ = [
"LightState",
@@ -27,4 +45,15 @@ __all__ = [
"Room",
"UiLayout",
"load_layout",
"GroupConfig",
"GroupsConfigRoot",
"GroupSelector",
"SceneConfig",
"ScenesConfigRoot",
"SceneSelector",
"SceneStep",
"get_group_by_id",
"get_scene_by_id",
"load_groups",
"load_scenes",
]

View File

@@ -0,0 +1,229 @@
"""
Configuration models and loaders for groups and scenes.
This module provides Pydantic models for validating groups.yaml and scenes.yaml,
along with loader functions that parse YAML files into typed configuration objects.
"""
from pathlib import Path
from typing import Any
import yaml
from pydantic import BaseModel, Field, field_validator, model_validator
# ============================================================================
# GROUP MODELS
# ============================================================================
class GroupSelector(BaseModel):
"""Selector for automatically adding devices to a group."""
type: str = Field(..., description="Device type (e.g., 'light', 'thermostat')")
room: str | None = Field(None, description="Filter by room name")
tags: list[str] | None = Field(None, description="Filter by device tags")
class GroupConfig(BaseModel):
"""Configuration for a device group."""
id: str = Field(..., description="Unique group identifier")
name: str = Field(..., description="Human-readable group name")
selector: GroupSelector | None = Field(None, description="Auto-select devices by criteria")
device_ids: list[str] = Field(default_factory=list, description="Explicit device IDs")
capabilities: dict[str, bool] = Field(
default_factory=dict,
description="Supported capabilities (e.g., {'brightness': True})"
)
class GroupsConfigRoot(BaseModel):
"""Root configuration for groups.yaml."""
version: int = Field(..., description="Configuration schema version")
groups: list[GroupConfig] = Field(default_factory=list, description="List of groups")
@field_validator('groups')
@classmethod
def validate_unique_ids(cls, groups: list[GroupConfig]) -> list[GroupConfig]:
"""Ensure all group IDs are unique."""
ids = [g.id for g in groups]
duplicates = [id for id in ids if ids.count(id) > 1]
if duplicates:
raise ValueError(f"Duplicate group IDs found: {set(duplicates)}")
return groups
# ============================================================================
# SCENE MODELS
# ============================================================================
class SceneSelector(BaseModel):
"""Selector for targeting devices in a scene step."""
type: str | None = Field(None, description="Device type (e.g., 'light', 'outlet')")
room: str | None = Field(None, description="Filter by room name")
tags: list[str] | None = Field(None, description="Filter by device tags")
class SceneStep(BaseModel):
"""A single step in a scene execution."""
selector: SceneSelector | None = Field(None, description="Select devices by criteria")
group_id: str | None = Field(None, description="Target a specific group")
action: dict[str, Any] = Field(..., description="Action to execute (type + payload)")
delay_ms: int | None = Field(None, description="Delay before next step (milliseconds)")
@model_validator(mode='after')
def validate_selector_or_group(self) -> 'SceneStep':
"""Ensure either selector OR group_id is specified, but not both."""
has_selector = self.selector is not None
has_group = self.group_id is not None
if not has_selector and not has_group:
raise ValueError("SceneStep must have either 'selector' or 'group_id'")
if has_selector and has_group:
raise ValueError("SceneStep cannot have both 'selector' and 'group_id'")
return self
class SceneConfig(BaseModel):
"""Configuration for a scene."""
id: str = Field(..., description="Unique scene identifier")
name: str = Field(..., description="Human-readable scene name")
steps: list[SceneStep] = Field(..., description="Ordered list of actions")
class ScenesConfigRoot(BaseModel):
"""Root configuration for scenes.yaml."""
version: int = Field(..., description="Configuration schema version")
scenes: list[SceneConfig] = Field(default_factory=list, description="List of scenes")
@field_validator('scenes')
@classmethod
def validate_unique_ids(cls, scenes: list[SceneConfig]) -> list[SceneConfig]:
"""Ensure all scene IDs are unique."""
ids = [s.id for s in scenes]
duplicates = [id for id in ids if ids.count(id) > 1]
if duplicates:
raise ValueError(f"Duplicate scene IDs found: {set(duplicates)}")
return scenes
# ============================================================================
# LOADER FUNCTIONS
# ============================================================================
def load_groups(path: Path | str) -> GroupsConfigRoot:
"""
Load and validate groups configuration from YAML file.
Args:
path: Path to groups.yaml file
Returns:
Validated GroupsConfigRoot object
Raises:
FileNotFoundError: If config file doesn't exist
ValidationError: If configuration is invalid
ValueError: If duplicate group IDs are found or YAML is empty
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Groups config file not found: {path}")
with open(path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data is None:
raise ValueError(f"Groups config file is empty: {path}")
return GroupsConfigRoot.model_validate(data)
def load_scenes(path: Path | str) -> ScenesConfigRoot:
"""
Load and validate scenes configuration from YAML file.
Args:
path: Path to scenes.yaml file
Returns:
Validated ScenesConfigRoot object
Raises:
FileNotFoundError: If config file doesn't exist
ValidationError: If configuration is invalid
ValueError: If duplicate scene IDs, invalid steps, or empty YAML are found
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Scenes config file not found: {path}")
with open(path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data is None:
raise ValueError(f"Scenes config file is empty: {path}")
return ScenesConfigRoot.model_validate(data)
# ============================================================================
# CONVENIENCE FUNCTIONS
# ============================================================================
def get_group_by_id(config: GroupsConfigRoot, group_id: str) -> GroupConfig | None:
"""Find a group by its ID."""
for group in config.groups:
if group.id == group_id:
return group
return None
def get_scene_by_id(config: ScenesConfigRoot, scene_id: str) -> SceneConfig | None:
"""Find a scene by its ID."""
for scene in config.scenes:
if scene.id == scene_id:
return scene
return None
# ============================================================================
# EXAMPLE USAGE
# ============================================================================
if __name__ == "__main__":
from pathlib import Path
# Example: Load groups configuration
try:
groups_path = Path(__file__).parent.parent / "config" / "groups.yaml"
groups = load_groups(groups_path)
print(f"✓ Loaded {len(groups.groups)} groups (version {groups.version})")
for group in groups.groups:
print(f" - {group.id}: {group.name}")
if group.selector:
print(f" Selector: type={group.selector.type}, room={group.selector.room}")
if group.device_ids:
print(f" Devices: {', '.join(group.device_ids)}")
except Exception as e:
print(f"✗ Error loading groups: {e}")
print()
# Example: Load scenes configuration
try:
scenes_path = Path(__file__).parent.parent / "config" / "scenes.yaml"
scenes = load_scenes(scenes_path)
print(f"✓ Loaded {len(scenes.scenes)} scenes (version {scenes.version})")
for scene in scenes.scenes:
print(f" - {scene.id}: {scene.name} ({len(scene.steps)} steps)")
for i, step in enumerate(scene.steps, 1):
if step.selector:
print(f" Step {i}: selector type={step.selector.type}")
elif step.group_id:
print(f" Step {i}: group_id={step.group_id}")
print(f" Action: {step.action}")
except Exception as e:
print(f"✗ Error loading scenes: {e}")