diff --git a/apps/abstraction/main.py b/apps/abstraction/main.py index ba443d2..717b658 100644 --- a/apps/abstraction/main.py +++ b/apps/abstraction/main.py @@ -43,6 +43,11 @@ def load_config(config_path: Path) -> dict[str, Any]: with open(config_path, "r") as f: config = yaml.safe_load(f) + # Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally + devices = config.get("devices", []) + for device in devices: + device["device_id"] = device.pop("device_id", device.pop("id", None)) + logger.info(f"Loaded configuration from {config_path}") return config @@ -56,16 +61,33 @@ def validate_devices(devices: list[dict[str, Any]]) -> None: Raises: ValueError: If device configuration is invalid """ + required_fields = ["device_id", "type", "cap_version", "technology"] + for device in devices: - if "id" not in device: - raise ValueError(f"Device missing 'id': {device}") - if "type" not in device: - raise ValueError(f"Device {device['id']} missing 'type'") + # Check for device_id + if "device_id" not in device or device["device_id"] is None: + raise ValueError(f"Device entry requires 'id' or 'device_id': {device}") + + device_id = device["device_id"] + + # Check required top-level fields + for field in required_fields: + if field not in device: + raise ValueError(f"Device {device_id} missing '{field}'") + + # Check topics structure if "topics" not in device: - raise ValueError(f"Device {device['id']} missing 'topics'") - if "set" not in device["topics"] or "state" not in device["topics"]: - raise ValueError(f"Device {device['id']} missing 'topics.set' or 'topics.state'") - logger.info(f"Validated {len(devices)} device(s)") + raise ValueError(f"Device {device_id} missing 'topics'") + + if "set" not in device["topics"]: + raise ValueError(f"Device {device_id} missing 'topics.set'") + + if "state" not in device["topics"]: + raise ValueError(f"Device {device_id} missing 'topics.state'") + + # Log loaded devices + device_ids = [d["device_id"] for d in devices] + logger.info(f"Loaded {len(devices)} device(s): {', '.join(device_ids)}") async def get_redis_client(redis_url: str, max_retries: int = 5) -> aioredis.Redis: @@ -172,7 +194,7 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N redis_config = config.get("redis", {}) redis_channel = redis_config.get("channel", "ui:updates") - devices = {d["id"]: d for d in config.get("devices", [])} + devices = {d["device_id"]: d for d in config.get("devices", [])} retry_delay = 1 max_retry_delay = 60 @@ -191,7 +213,7 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N # Subscribe to abstract SET topics for all devices for device in devices.values(): - abstract_set_topic = f"home/{device['type']}/{device['id']}/set" + abstract_set_topic = f"home/{device['type']}/{device['device_id']}/set" await client.subscribe(abstract_set_topic) logger.info(f"Subscribed to abstract SET: {abstract_set_topic}") diff --git a/apps/api/main.py b/apps/api/main.py index be70d9d..242694e 100644 --- a/apps/api/main.py +++ b/apps/api/main.py @@ -1,14 +1,17 @@ """API main entry point.""" +import asyncio import json import os from pathlib import Path -from typing import Any +from typing import Any, AsyncGenerator +import redis.asyncio as aioredis import yaml from aiomqtt import Client -from fastapi import FastAPI, HTTPException, status +from fastapi import FastAPI, HTTPException, Request, status from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse from pydantic import BaseModel, ValidationError from packages.home_capabilities import CAP_VERSION, LightState @@ -85,7 +88,12 @@ def load_devices() -> list[dict[str, Any]]: with open(config_path, "r") as f: config = yaml.safe_load(f) - return config.get("devices", []) + # Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally + devices = config.get("devices", []) + for device in devices: + device["device_id"] = device.pop("device_id", device.pop("id", None)) + + return devices def get_mqtt_settings() -> tuple[str, int]: @@ -99,6 +107,25 @@ def get_mqtt_settings() -> tuple[str, int]: return host, port +def get_redis_settings() -> tuple[str, str]: + """Get Redis settings from configuration. + + Returns: + tuple: (url, channel) + """ + config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml" + + if config_path.exists(): + with open(config_path, "r") as f: + config = yaml.safe_load(f) + redis_config = config.get("redis", {}) + url = redis_config.get("url", "redis://localhost:6379/0") + channel = redis_config.get("channel", "ui:updates") + return url, channel + + return "redis://localhost:6379/0", "ui:updates" + + async def publish_mqtt(topic: str, payload: dict[str, Any]) -> None: """Publish message to MQTT broker. @@ -123,9 +150,9 @@ async def get_devices() -> list[DeviceInfo]: devices = load_devices() return [ DeviceInfo( - device_id=device["id"], + device_id=device["device_id"], type=device["type"], - name=device.get("name", device["id"]) + name=device.get("name", device["device_id"]) ) for device in devices ] @@ -147,7 +174,7 @@ async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str """ # Load devices and check if device exists devices = load_devices() - device = next((d for d in devices if d["id"] == device_id), None) + device = next((d for d in devices if d["device_id"] == device_id), None) if not device: raise HTTPException( @@ -182,6 +209,81 @@ async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str return {"message": f"Command sent to {device_id}"} +async def event_generator(request: Request) -> AsyncGenerator[str, None]: + """Generate SSE events from Redis Pub/Sub. + + Args: + request: FastAPI request object for disconnect detection + + Yields: + str: SSE formatted event strings + """ + redis_url, redis_channel = get_redis_settings() + redis_client = await aioredis.from_url(redis_url, decode_responses=True) + pubsub = redis_client.pubsub() + + try: + await pubsub.subscribe(redis_channel) + + # Create heartbeat task + last_heartbeat = asyncio.get_event_loop().time() + + while True: + # Check if client disconnected + if await request.is_disconnected(): + break + + # Get message with timeout for heartbeat + try: + message = await asyncio.wait_for( + pubsub.get_message(ignore_subscribe_messages=True), + timeout=1.0 + ) + + if message and message["type"] == "message": + # Send data event + data = message["data"] + yield f"event: message\ndata: {data}\n\n" + last_heartbeat = asyncio.get_event_loop().time() + + except asyncio.TimeoutError: + pass + + # Send heartbeat every 25 seconds + current_time = asyncio.get_event_loop().time() + if current_time - last_heartbeat >= 25: + yield "event: ping\ndata: heartbeat\n\n" + last_heartbeat = current_time + + finally: + await pubsub.unsubscribe(redis_channel) + await pubsub.close() + await redis_client.close() + + +@app.get("/realtime") +async def realtime_events(request: Request) -> StreamingResponse: + """Server-Sent Events endpoint for real-time updates. + + Args: + request: FastAPI request object + + Returns: + StreamingResponse: SSE stream of Redis messages + """ + return StreamingResponse( + event_generator(request), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering + } + ) + + return {"message": f"Command sent to {device_id}"} + + def main() -> None: """Run the API application with uvicorn.""" import uvicorn diff --git a/apps/ui/templates/index.html b/apps/ui/templates/index.html index f0cf3ad..2288fc1 100644 --- a/apps/ui/templates/index.html +++ b/apps/ui/templates/index.html @@ -15,18 +15,20 @@ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; - display: flex; - justify-content: center; - align-items: center; + padding: 2rem; } .container { + max-width: 1200px; + margin: 0 auto; + } + + header { background: white; border-radius: 16px; padding: 2rem; - box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3); - max-width: 600px; - width: 90%; + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); + margin-bottom: 2rem; } h1 { @@ -34,17 +36,355 @@ margin-bottom: 0.5rem; } - p { + .status { + display: inline-block; + padding: 0.25rem 0.75rem; + border-radius: 12px; + font-size: 0.875rem; + font-weight: 500; + } + + .status.connected { + background: #d4edda; + color: #155724; + } + + .status.disconnected { + background: #f8d7da; + color: #721c24; + } + + .devices { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(300px, 1fr)); + gap: 1.5rem; + } + + .device-card { + background: white; + border-radius: 16px; + padding: 1.5rem; + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); + } + + .device-header { + margin-bottom: 1.5rem; + } + + .device-name { + font-size: 1.5rem; + font-weight: 600; + color: #333; + margin-bottom: 0.25rem; + } + + .device-type { + font-size: 0.875rem; color: #666; - line-height: 1.6; + text-transform: uppercase; + } + + .device-state { + padding: 0.5rem 1rem; + background: #f8f9fa; + border-radius: 8px; + margin: 1rem 0; + font-family: 'Courier New', monospace; + font-size: 0.875rem; + } + + .state-label { + color: #666; + font-weight: 500; + } + + .state-value { + color: #333; + font-weight: 600; + } + + .state-value.on { + color: #28a745; + } + + .state-value.off { + color: #dc3545; + } + + .controls { + display: flex; + flex-direction: column; + gap: 1rem; + } + + .toggle-button { + padding: 0.75rem 1.5rem; + border: none; + border-radius: 8px; + font-size: 1rem; + font-weight: 600; + cursor: pointer; + transition: all 0.2s; + color: white; + } + + .toggle-button.on { + background: #28a745; + } + + .toggle-button.on:hover { + background: #218838; + } + + .toggle-button.off { + background: #6c757d; + } + + .toggle-button.off:hover { + background: #5a6268; + } + + .events { + margin-top: 2rem; + background: white; + border-radius: 16px; + padding: 1.5rem; + box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); + } + + .events h2 { + color: #333; + margin-bottom: 1rem; + font-size: 1.25rem; + } + + .event-list { + max-height: 300px; + overflow-y: auto; + } + + .event-item { + padding: 0.75rem; + border-left: 3px solid #667eea; + background: #f8f9fa; + margin-bottom: 0.5rem; + border-radius: 4px; + font-size: 0.875rem; + } + + .event-time { + color: #666; + font-size: 0.75rem; + } + + .event-data { + color: #333; + margin-top: 0.25rem; + font-family: 'Courier New', monospace; }
UI wird geladen...
-API erreichbar? API Health Check
+Realtime Status: Verbinde...
+Warte auf Events...
+