groups and scenes initial

This commit is contained in:
2025-11-13 21:29:04 +01:00
parent 4c5475e930
commit 5851414ba5
10 changed files with 1666 additions and 7 deletions

View File

@@ -25,11 +25,27 @@ from packages.home_capabilities import (
ThermostatState,
ContactState,
TempHumidityState,
RelayState
RelayState,
load_layout,
)
# Import resolvers (must be before router imports to avoid circular dependency)
from apps.api.resolvers import (
DeviceDTO,
resolve_group_devices,
resolve_scene_step_devices,
load_device_rooms,
get_room,
clear_room_cache,
)
logger = logging.getLogger(__name__)
# ============================================================================
# STATE CACHES
# ============================================================================
# In-memory cache for last known device states
# Will be populated from Redis pub/sub messages
device_states: dict[str, dict[str, Any]] = {}
@@ -57,6 +73,13 @@ app.add_middleware(
)
@app.on_event("startup")
async def startup_event():
"""Include routers after app is initialized to avoid circular imports."""
from apps.api.routes.groups_scenes import router as groups_scenes_router
app.include_router(groups_scenes_router, prefix="", tags=["groups", "scenes"])
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint.
@@ -207,6 +230,50 @@ def get_mqtt_settings() -> tuple[str, int]:
return host, port
# ============================================================================
# MQTT PUBLISH
# ============================================================================
async def publish_abstract_set(device_type: str, device_id: str, payload: dict[str, Any]) -> None:
"""
Publish an abstract set command via MQTT.
This function encapsulates MQTT publishing logic so that group/scene
execution doesn't need to know MQTT topic details.
Topic format: home/{device_type}/{device_id}/set
Message format: {"type": device_type, "payload": payload}
Args:
device_type: Device type (light, thermostat, relay, etc.)
device_id: Device identifier
payload: Command payload (e.g., {"power": "on", "brightness": 50})
Example:
>>> await publish_abstract_set("light", "kueche_deckenlampe", {"power": "on", "brightness": 35})
# Publishes to: home/light/kueche_deckenlampe/set
# Message: {"type": "light", "payload": {"power": "on", "brightness": 35}}
"""
mqtt_host, mqtt_port = get_mqtt_settings()
topic = f"home/{device_type}/{device_id}/set"
message = {
"type": device_type,
"payload": payload
}
try:
async with Client(hostname=mqtt_host, port=mqtt_port) as client:
await client.publish(
topic=topic,
payload=json.dumps(message),
qos=1
)
logger.info(f"Published to {topic}: {message}")
except Exception as e:
logger.error(f"Failed to publish to {topic}: {e}")
raise
def get_redis_settings() -> tuple[str, str]:
"""Get Redis settings from configuration.
@@ -291,8 +358,6 @@ async def get_layout() -> dict[str, Any]:
Returns:
dict: Layout configuration with rooms and device tiles
"""
from packages.home_capabilities import load_layout
try:
layout = load_layout()
@@ -321,6 +386,23 @@ async def get_layout() -> dict[str, Any]:
return {"rooms": []}
@app.get("/devices/{device_id}/room")
async def get_device_room(device_id: str) -> dict[str, str | None]:
"""Get the room name for a specific device.
Args:
device_id: Device identifier
Returns:
dict: {"device_id": str, "room": str | null}
"""
room = get_room(device_id)
return {
"device_id": device_id,
"room": room
}
@app.post("/devices/{device_id}/set", status_code=status.HTTP_202_ACCEPTED)
async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str]:
"""Set device state.

286
apps/api/resolvers.py Normal file
View File

@@ -0,0 +1,286 @@
"""Group and scene resolution logic."""
import logging
from pathlib import Path
from typing import Any, TypedDict
from packages.home_capabilities import (
GroupConfig,
GroupsConfigRoot,
SceneStep,
get_group_by_id,
load_layout,
)
logger = logging.getLogger(__name__)
# ============================================================================
# TYPE DEFINITIONS
# ============================================================================
class DeviceDTO(TypedDict, total=False):
"""Device Data Transfer Object.
Represents a device as returned by /devices endpoint or load_devices().
Required fields:
device_id: Unique device identifier
type: Device type (light, thermostat, relay, etc.)
Optional fields:
name: Human-readable device name
features: Device capabilities (power, brightness, etc.)
technology: MQTT, zigbee2mqtt, simulator, etc.
topics: MQTT topic configuration
metadata: Additional device information
"""
device_id: str
type: str
name: str
features: dict[str, Any]
technology: str
topics: dict[str, str]
metadata: dict[str, Any]
# ============================================================================
# DEVICE-ROOM MAPPING
# ============================================================================
# Global cache for device -> room mapping
_device_room_cache: dict[str, str] = {}
def load_device_rooms(path: str | Path | None = None) -> dict[str, str]:
"""
Load device-to-room mapping from layout configuration.
This function extracts a mapping of device_id -> room_name from the layout.yaml
file, which is useful for resolving selectors like {room: "Küche"}.
Args:
path: Optional path to layout.yaml. If None, uses default path
(config/layout.yaml relative to workspace root)
Returns:
Dictionary mapping device_id to room_name. Returns empty dict if:
- layout.yaml doesn't exist
- layout.yaml is malformed
- layout.yaml is empty
Example:
>>> mapping = load_device_rooms()
>>> mapping['kueche_lampe1']
'Küche'
"""
global _device_room_cache
try:
# Load the layout using existing function
layout = load_layout(path)
# Build device -> room mapping
device_rooms: dict[str, str] = {}
for room in layout.rooms:
for device in room.devices:
device_rooms[device.device_id] = room.name
# Update global cache
_device_room_cache = device_rooms.copy()
logger.info(f"Loaded device-room mapping: {len(device_rooms)} devices")
return device_rooms
except (FileNotFoundError, ValueError, Exception) as e:
logger.warning(f"Failed to load device-room mapping: {e}")
logger.warning("Returning empty device-room mapping")
_device_room_cache = {}
return {}
def get_room(device_id: str) -> str | None:
"""
Get the room name for a given device ID.
This function uses the cached device-room mapping loaded by load_device_rooms().
If the cache is empty, it will attempt to load it first.
Args:
device_id: The device identifier to lookup
Returns:
Room name if device is found, None otherwise
Example:
>>> get_room('kueche_lampe1')
'Küche'
>>> get_room('nonexistent_device')
None
"""
# Check if cache is populated
if not _device_room_cache:
logger.debug("Device-room cache empty, loading from layout...")
# Load mapping (this updates the global _device_room_cache)
load_device_rooms()
# Access the cache after potential reload
return _device_room_cache.get(device_id)
def clear_room_cache() -> None:
"""
Clear the cached device-room mapping.
This is useful for testing or when the layout configuration has changed
and needs to be reloaded.
"""
_device_room_cache.clear()
logger.debug("Cleared device-room cache")
# ============================================================================
# GROUP & SCENE RESOLUTION
# ============================================================================
def resolve_group_devices(
group: GroupConfig,
devices: list[DeviceDTO],
device_rooms: dict[str, str]
) -> list[DeviceDTO]:
"""
Resolve devices for a group based on device_ids or selector.
Args:
group: Group configuration with device_ids or selector
devices: List of all available devices
device_rooms: Mapping of device_id -> room_name
Returns:
List of devices matching the group criteria (no duplicates)
Example:
>>> # Group with explicit device_ids
>>> group = GroupConfig(id="test", name="Test", device_ids=["lamp1", "lamp2"])
>>> resolve_group_devices(group, all_devices, {})
[{"device_id": "lamp1", ...}, {"device_id": "lamp2", ...}]
>>> # Group with selector (all lights in kitchen)
>>> group = GroupConfig(
... id="kitchen_lights",
... name="Kitchen Lights",
... selector=GroupSelector(type="light", room="Küche")
... )
>>> resolve_group_devices(group, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
"""
# Case 1: Explicit device_ids
if group.device_ids:
device_id_set = set(group.device_ids)
return [d for d in devices if d["device_id"] in device_id_set]
# Case 2: Selector-based filtering
if group.selector:
filtered = []
for device in devices:
# Filter by type (required in selector)
if device["type"] != group.selector.type:
continue
# Filter by room (optional)
if group.selector.room:
device_room = device_rooms.get(device["device_id"])
if device_room != group.selector.room:
continue
# Filter by tags (optional, future feature)
# if group.selector.tags:
# device_tags = device.get("metadata", {}).get("tags", [])
# if not any(tag in device_tags for tag in group.selector.tags):
# continue
filtered.append(device)
return filtered
# No device_ids and no selector → empty list
return []
def resolve_scene_step_devices(
step: SceneStep,
groups_config: GroupsConfigRoot,
devices: list[DeviceDTO],
device_rooms: dict[str, str]
) -> list[DeviceDTO]:
"""
Resolve devices for a scene step based on group_id or selector.
Args:
step: Scene step with group_id or selector
groups_config: Groups configuration for group lookup
devices: List of all available devices
device_rooms: Mapping of device_id -> room_name
Returns:
List of devices matching the step criteria
Raises:
ValueError: If group_id is specified but group not found
Example:
>>> # Step with group_id
>>> step = SceneStep(group_id="kitchen_lights", action={...})
>>> resolve_scene_step_devices(step, groups_cfg, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
>>> # Step with selector
>>> step = SceneStep(
... selector=SceneSelector(type="light", room="Küche"),
... action={...}
... )
>>> resolve_scene_step_devices(step, groups_cfg, all_devices, device_rooms)
[{"device_id": "kueche_deckenlampe", ...}, ...]
"""
# Case 1: Group reference
if step.group_id:
# Look up the group
group = get_group_by_id(groups_config, step.group_id)
if not group:
raise ValueError(
f"Scene step references unknown group_id: '{step.group_id}'. "
f"Available groups: {[g.id for g in groups_config.groups]}"
)
# Resolve the group's devices
return resolve_group_devices(group, devices, device_rooms)
# Case 2: Direct selector
if step.selector:
filtered = []
for device in devices:
# Filter by type (optional in scene selector)
if step.selector.type and device["type"] != step.selector.type:
continue
# Filter by room (optional)
if step.selector.room:
device_room = device_rooms.get(device["device_id"])
if device_room != step.selector.room:
continue
# Filter by tags (optional, future feature)
# if step.selector.tags:
# device_tags = device.get("metadata", {}).get("tags", [])
# if not any(tag in device_tags for tag in step.selector.tags):
# continue
filtered.append(device)
return filtered
# Should not reach here due to SceneStep validation (must have group_id or selector)
return []

View File

@@ -0,0 +1 @@
"""API routes package."""

View File

@@ -0,0 +1,454 @@
"""Groups and Scenes API routes."""
import asyncio
import logging
from pathlib import Path
from typing import Any
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from packages.home_capabilities import (
GroupConfig,
GroupsConfigRoot,
SceneConfig,
ScenesConfigRoot,
get_group_by_id,
get_scene_by_id,
load_groups,
load_scenes,
)
# Import from parent modules
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from resolvers import (
DeviceDTO,
resolve_group_devices,
resolve_scene_step_devices,
load_device_rooms,
)
from main import load_devices, publish_abstract_set
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================
class GroupResponse(BaseModel):
"""Response model for a group."""
id: str
name: str
device_count: int
devices: list[str]
selector: dict[str, Any] | None = None
capabilities: dict[str, bool]
class GroupSetRequest(BaseModel):
"""Request to set state for all devices in a group."""
action: dict[str, Any] # e.g., {"type": "light", "payload": {"power": "on", "brightness": 50}}
class SceneResponse(BaseModel):
"""Response model for a scene."""
id: str
name: str
steps: int
class SceneRunRequest(BaseModel):
"""Request to execute a scene (currently empty, future: override params)."""
pass
class SceneExecutionResponse(BaseModel):
"""Response after scene execution."""
scene_id: str
scene_name: str
steps_executed: int
devices_affected: int
execution_plan: list[dict[str, Any]]
# ============================================================================
# GROUPS ENDPOINTS
# ============================================================================
@router.get("/groups", response_model=list[GroupResponse])
async def list_groups() -> list[GroupResponse]:
"""
List all available groups.
Returns:
list[GroupResponse]: List of groups with their devices
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Build response for each group
response = []
for group in groups_config.groups:
# Resolve devices for this group
resolved_devices = resolve_group_devices(group, devices, device_rooms)
device_ids = [d["device_id"] for d in resolved_devices]
# Convert selector to dict if present
selector_dict = None
if group.selector:
selector_dict = {
"type": group.selector.type,
"room": group.selector.room,
"tags": group.selector.tags,
}
response.append(GroupResponse(
id=group.id,
name=group.name,
device_count=len(device_ids),
devices=device_ids,
selector=selector_dict,
capabilities=group.capabilities,
))
return response
except Exception as e:
logger.error(f"Error loading groups: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to load groups: {str(e)}"
)
@router.get("/groups/{group_id}", response_model=GroupResponse)
async def get_group(group_id: str) -> GroupResponse:
"""
Get details for a specific group.
Args:
group_id: Group identifier
Returns:
GroupResponse: Group details with resolved devices
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the group
group = get_group_by_id(groups_config, group_id)
if not group:
available_groups = [g.id for g in groups_config.groups]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Group '{group_id}' not found. Available groups: {available_groups}"
)
# Resolve devices
resolved_devices = resolve_group_devices(group, devices, device_rooms)
device_ids = [d["device_id"] for d in resolved_devices]
# Convert selector to dict if present
selector_dict = None
if group.selector:
selector_dict = {
"type": group.selector.type,
"room": group.selector.room,
"tags": group.selector.tags,
}
return GroupResponse(
id=group.id,
name=group.name,
device_count=len(device_ids),
devices=device_ids,
selector=selector_dict,
capabilities=group.capabilities,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting group {group_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get group: {str(e)}"
)
@router.post("/groups/{group_id}/set", status_code=status.HTTP_202_ACCEPTED)
async def set_group(group_id: str, request: GroupSetRequest) -> dict[str, Any]:
"""
Set state for all devices in a group.
This endpoint resolves the group to its devices and would send
the action to each device. Currently returns execution plan.
Args:
group_id: Group identifier
request: Action to apply to all devices in the group
Returns:
dict: Execution plan (devices and actions to be executed)
"""
try:
# Load configuration
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the group
group = get_group_by_id(groups_config, group_id)
if not group:
available_groups = [g.id for g in groups_config.groups]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Group '{group_id}' not found. Available groups: {available_groups}"
)
# Resolve devices
resolved_devices = resolve_group_devices(group, devices, device_rooms)
if not resolved_devices:
logger.warning(f"Group '{group_id}' resolved to 0 devices")
# Execute actions via MQTT
execution_plan = []
for device in resolved_devices:
device_type = device["type"]
device_id = device["device_id"]
payload = request.action.get("payload", {})
# Publish MQTT command
try:
await publish_abstract_set(device_type, device_id, payload)
execution_plan.append({
"device_id": device_id,
"device_type": device_type,
"action": request.action,
"status": "published"
})
except Exception as e:
logger.error(f"Failed to publish to {device_id}: {e}")
execution_plan.append({
"device_id": device_id,
"device_type": device_type,
"action": request.action,
"status": "failed",
"error": str(e)
})
return {
"group_id": group_id,
"group_name": group.name,
"devices_affected": len(resolved_devices),
"execution_plan": execution_plan,
"status": "executed"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error setting group {group_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to set group: {str(e)}"
)
# ============================================================================
# SCENES ENDPOINTS
# ============================================================================
@router.get("/scenes", response_model=list[SceneResponse])
async def list_scenes() -> list[SceneResponse]:
"""
List all available scenes.
Returns:
list[SceneResponse]: List of scenes
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
# Build response for each scene
response = []
for scene in scenes_config.scenes:
response.append(SceneResponse(
id=scene.id,
name=scene.name,
steps=len(scene.steps),
))
return response
except Exception as e:
logger.error(f"Error loading scenes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to load scenes: {str(e)}"
)
@router.get("/scenes/{scene_id}", response_model=SceneResponse)
async def get_scene(scene_id: str) -> SceneResponse:
"""
Get details for a specific scene.
Args:
scene_id: Scene identifier
Returns:
SceneResponse: Scene details
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
# Find the scene
scene = get_scene_by_id(scenes_config, scene_id)
if not scene:
available_scenes = [s.id for s in scenes_config.scenes]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scene '{scene_id}' not found. Available scenes: {available_scenes}"
)
return SceneResponse(
id=scene.id,
name=scene.name,
steps=len(scene.steps),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting scene {scene_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get scene: {str(e)}"
)
@router.post("/scenes/{scene_id}/run", response_model=SceneExecutionResponse)
async def run_scene(scene_id: str, request: SceneRunRequest | None = None) -> SceneExecutionResponse:
"""
Execute a scene.
This endpoint resolves each step in the scene to its target devices
and would execute the actions. Currently returns execution plan.
Args:
scene_id: Scene identifier
request: Optional execution parameters (reserved for future use)
Returns:
SceneExecutionResponse: Execution plan and summary
"""
try:
# Load configuration
scenes_config = load_scenes(Path(__file__).parent.parent.parent.parent / "config" / "scenes.yaml")
groups_config = load_groups(Path(__file__).parent.parent.parent.parent / "config" / "groups.yaml")
devices = load_devices()
device_rooms = load_device_rooms()
# Find the scene
scene = get_scene_by_id(scenes_config, scene_id)
if not scene:
available_scenes = [s.id for s in scenes_config.scenes]
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scene '{scene_id}' not found. Available scenes: {available_scenes}"
)
# Execute scene steps
execution_plan = []
total_devices = 0
for i, step in enumerate(scene.steps, 1):
# Resolve devices for this step
resolved_devices = resolve_scene_step_devices(step, groups_config, devices, device_rooms)
total_devices += len(resolved_devices)
# Extract action payload
action_payload = step.action.get("payload", {})
# Execute for each device
step_executions = []
for device in resolved_devices:
device_type = device["type"]
device_id = device["device_id"]
try:
await publish_abstract_set(device_type, device_id, action_payload)
step_executions.append({
"device_id": device_id,
"status": "published"
})
except Exception as e:
logger.error(f"Failed to publish to {device_id} in step {i}: {e}")
step_executions.append({
"device_id": device_id,
"status": "failed",
"error": str(e)
})
# Build step info
step_info = {
"step": i,
"devices_affected": len(resolved_devices),
"device_ids": [d["device_id"] for d in resolved_devices],
"action": step.action,
"executions": step_executions,
}
# Add targeting info
if step.group_id:
step_info["target"] = {"type": "group_id", "value": step.group_id}
elif step.selector:
step_info["target"] = {
"type": "selector",
"selector_type": step.selector.type,
"room": step.selector.room,
}
if step.delay_ms:
step_info["delay_ms"] = step.delay_ms
# Apply delay before next step
await asyncio.sleep(step.delay_ms / 1000.0)
execution_plan.append(step_info)
return SceneExecutionResponse(
scene_id=scene.id,
scene_name=scene.name,
steps_executed=len(scene.steps),
devices_affected=total_devices,
execution_plan=execution_plan,
)
except HTTPException:
raise
except ValueError as e:
# Handle unknown group_id in scene step
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Error running scene {scene_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to run scene: {str(e)}"
)