# --- Minimal-invasive: Einzelgerät-Layout-Endpunkt --- from fastapi import Query @app.get("/devices/{device_id}/layout") async def get_device_layout(device_id: str): """Gibt die layout-spezifischen Informationen für ein einzelnes Gerät zurück.""" layout = load_layout() for room in layout.get("rooms", []): for device in room.get("devices", []): if device.get("device_id") == device_id: # Rückgabe: Layout-Infos + Raumname return { "device_id": device_id, "room": room.get("name"), "title": device.get("title"), "icon": device.get("icon"), "rank": device.get("rank"), } raise HTTPException(status_code=404, detail="Device layout not found") # --- Minimal-invasive: Einzelgerät-State-Endpunkt --- @app.get("/devices/{device_id}/state") async def get_device_state(device_id: str): """Gibt den aktuellen State für ein einzelnes Gerät zurück.""" # States werden wie im Bulk-Endpoint geladen state_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml" if not state_path.exists(): raise HTTPException(status_code=500, detail="State file not found") with open(state_path, "r") as f: config = yaml.safe_load(f) states = config.get("states", {}) state = states.get(device_id) if state is None: raise HTTPException(status_code=404, detail="Device state not found") return state """API main entry point. API-Analyse für HomeKit-Bridge Kompatibilität ============================================== 1) GET /devices Status: ✅ VORHANDEN (Zeile 325-343) Aktuelles Response-Modell (DeviceInfo, Zeile 189-194): { "device_id": str, ✅ OK "type": str, ✅ OK "name": str, ⚠️ ABWEICHUNG: Erwartet wurde "short_name" (optional) "features": dict ✅ OK } Bewertung: - ✅ Liefert device_id, type, features wie erwartet - ⚠️ Verwendet "name" statt "short_name" - ✅ Fallback auf device_id wenn name nicht vorhanden - Kompatibilität: HOCH - einfach "name" als "short_name" verwenden 2) GET /layout Status: ✅ VORHANDEN (Zeile 354-387) Aktuelles Response-Format: { "rooms": [ { "name": "Schlafzimmer", "devices": [ { "device_id": "thermostat_wolfgang", "title": "Thermostat Wolfgang", ← friendly_name "icon": "thermometer", "rank": 1 } ] } ] } Mapping device_id -> room, friendly_name: - room: Durch Iteration über rooms[].devices[] ableitbar - friendly_name: Im Feld "title" enthalten Bewertung: - ✅ Alle erforderlichen Informationen vorhanden - ⚠️ ABWEICHUNG: Verschachtelte Struktur (rooms -> devices) - ⚠️ ABWEICHUNG: friendly_name heißt "title" - Kompatibilität: HOCH - einfache Transformation möglich: ```python for room in layout["rooms"]: for device in room["devices"]: mapping[device["device_id"]] = { "room": room["name"], "friendly_name": device["title"] } ``` 3) POST /devices/{device_id}/set Status: ✅ VORHANDEN (Zeile 406-504) Aktuelles Request-Modell (SetDeviceRequest, Zeile 182-185): { "type": str, ✅ OK - muss zum Gerätetyp passen "payload": dict ✅ OK - abstraktes Kommando } Beispiel Light: POST /devices/leselampe_esszimmer/set {"type": "light", "payload": {"power": "on", "brightness": 80}} Beispiel Thermostat: POST /devices/thermostat_wolfgang/set {"type": "thermostat", "payload": {"target": 21.0}} Validierung: - ✅ Type-spezifische Payload-Validierung (Zeile 437-487) - ✅ Read-only Check → 405 METHOD_NOT_ALLOWED (Zeile 431-435) - ✅ Ungültige Payload → 422 UNPROCESSABLE_ENTITY - ✅ Device nicht gefunden → 404 NOT_FOUND Bewertung: - ✅ Exakt wie erwartet implementiert - ✅ Alle geforderten Error Codes vorhanden - Kompatibilität: PERFEKT 4) Realtime-Endpoint (SSE) Status: ✅ VORHANDEN als GET /realtime (Zeile 608-632) Implementierung: - ✅ Server-Sent Events (media_type="text/event-stream") - ✅ Redis Pub/Sub basiert (event_generator, Zeile 510-607) - ✅ Safari-kompatibel (Heartbeats, Retry-Hints) Aktuelles Event-Format (aus apps/abstraction/main.py:250-256): { "type": "state", ✅ OK "device_id": str, ✅ OK "payload": dict, ✅ OK - z.B. {"power":"on","brightness":80} "ts": str ✅ OK - ISO-8601 format von datetime.now(timezone.utc) } Beispiel-Event: { "type": "state", "device_id": "thermostat_wolfgang", "payload": {"current": 19.5, "target": 21.0}, "ts": "2025-11-17T14:23:45.123456+00:00" } Bewertung: - ✅ Alle geforderten Felder vorhanden - ✅ Timestamp im korrekten Format - ✅ SSE mit proper headers und error handling - Kompatibilität: PERFEKT ZUSAMMENFASSUNG =============== Alle 4 geforderten Endpunkte sind implementiert! Kompatibilität mit HomeKit-Bridge Anforderungen: - GET /devices: HOCH (nur Name-Feld unterschiedlich) - GET /layout: HOCH (Struktur-Transformation nötig) - POST /devices/{id}/set: PERFEKT (1:1 wie gefordert) - GET /realtime (SSE): PERFEKT (1:1 wie gefordert) Erforderliche Anpassungen für Bridge: 1. GET /devices: "name" als "short_name" interpretieren ✓ trivial 2. GET /layout: Verschachtelte Struktur zu flat mapping umwandeln ✓ einfach Keine Code-Änderungen in der API erforderlich! Die Bridge kann die bestehenden Endpoints direkt nutzen. """ import asyncio import json import logging import os from pathlib import Path from typing import Any, AsyncGenerator import redis.asyncio as aioredis import yaml from aiomqtt import Client from fastapi import FastAPI, HTTPException, Request, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, ValidationError from packages.home_capabilities import ( LIGHT_VERSION, THERMOSTAT_VERSION, CONTACT_SENSOR_VERSION, TEMP_HUMIDITY_SENSOR_VERSION, RELAY_VERSION, LightState, ThermostatState, ContactState, TempHumidityState, RelayState, load_layout, ) # Import resolvers (must be before router imports to avoid circular dependency) from apps.api.resolvers import ( DeviceDTO, resolve_group_devices, resolve_scene_step_devices, load_device_rooms, get_room, clear_room_cache, ) logger = logging.getLogger(__name__) # ============================================================================ # STATE CACHES # ============================================================================ # In-memory cache for last known device states # Will be populated from Redis pub/sub messages device_states: dict[str, dict[str, Any]] = {} # Background task reference background_task: asyncio.Task | None = None app = FastAPI( title="Home Automation API", description="API for home automation system", version="0.1.0" ) # Configure CORS for localhost (Frontend) app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:8002", "http://172.19.1.11:8002", "http://127.0.0.1:8002", ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") async def startup_event(): """Include routers after app is initialized to avoid circular imports.""" from apps.api.routes.groups_scenes import router as groups_scenes_router app.include_router(groups_scenes_router, prefix="") @app.get("/health") async def health() -> dict[str, str]: """Health check endpoint. Returns: dict: Status indicating the service is healthy """ return {"status": "ok"} async def redis_state_listener(): """Background task that listens to Redis pub/sub and updates state cache.""" redis_client = None pubsub = None try: redis_url, redis_channel = get_redis_settings() logger.info(f"Starting Redis state listener for channel {redis_channel}") redis_client = await aioredis.from_url(redis_url, decode_responses=True) pubsub = redis_client.pubsub() await pubsub.subscribe(redis_channel) logger.info("Redis state listener connected") while True: try: message = await asyncio.wait_for( pubsub.get_message(ignore_subscribe_messages=True), timeout=1.0 ) if message and message["type"] == "message": data = message["data"] try: state_data = json.loads(data) if state_data.get("type") == "state" and state_data.get("device_id"): device_id = state_data["device_id"] payload = state_data.get("payload", {}) device_states[device_id] = payload logger.debug(f"Updated state cache for {device_id}: {payload}") except Exception as e: logger.warning(f"Failed to parse state data: {e}") except asyncio.TimeoutError: pass # No message, continue except asyncio.CancelledError: logger.info("Redis state listener cancelled") raise except Exception as e: logger.error(f"Redis state listener error: {e}") finally: if pubsub: await pubsub.unsubscribe(redis_channel) await pubsub.close() if redis_client: await redis_client.close() @app.on_event("startup") async def startup_event(): """Start background tasks on application startup.""" global background_task background_task = asyncio.create_task(redis_state_listener()) logger.info("Started background Redis state listener") @app.on_event("shutdown") async def shutdown_event(): """Clean up background tasks on application shutdown.""" global background_task if background_task: background_task.cancel() try: await background_task except asyncio.CancelledError: pass logger.info("Stopped background Redis state listener") @app.get("/spec") async def spec() -> dict[str, dict[str, str]]: """Capability specification endpoint. Returns: dict: Dictionary containing supported capabilities and their versions """ return { "capabilities": { "light": LIGHT_VERSION, "thermostat": THERMOSTAT_VERSION, "contact": CONTACT_SENSOR_VERSION, "temp_humidity": TEMP_HUMIDITY_SENSOR_VERSION, "relay": RELAY_VERSION } } # Pydantic Models class SetDeviceRequest(BaseModel): """Request model for setting device state.""" type: str payload: dict[str, Any] class DeviceInfo(BaseModel): """Device information model.""" device_id: str type: str name: str features: dict[str, Any] = {} # Configuration helpers def load_devices() -> list[dict[str, Any]]: """Load devices from configuration file. Returns: list: List of device configurations """ config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml" if not config_path.exists(): return [] with open(config_path, "r") as f: config = yaml.safe_load(f) # Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally devices = config.get("devices", []) for device in devices: device["device_id"] = device.pop("device_id", device.pop("id", None)) return devices def get_mqtt_settings() -> tuple[str, int]: """Get MQTT broker settings from environment. Supports both MQTT_BROKER and MQTT_HOST for compatibility. Returns: tuple: (host, port) """ host = os.environ.get("MQTT_BROKER") or os.environ.get("MQTT_HOST", "172.16.2.16") port = int(os.environ.get("MQTT_PORT", "1883")) return host, port # ============================================================================ # MQTT PUBLISH # ============================================================================ async def publish_abstract_set(device_type: str, device_id: str, payload: dict[str, Any]) -> None: """ Publish an abstract set command via MQTT. This function encapsulates MQTT publishing logic so that group/scene execution doesn't need to know MQTT topic details. Topic format: home/{device_type}/{device_id}/set Message format: {"type": device_type, "payload": payload} Args: device_type: Device type (light, thermostat, relay, etc.) device_id: Device identifier payload: Command payload (e.g., {"power": "on", "brightness": 50}) Example: >>> await publish_abstract_set("light", "kueche_deckenlampe", {"power": "on", "brightness": 35}) # Publishes to: home/light/kueche_deckenlampe/set # Message: {"type": "light", "payload": {"power": "on", "brightness": 35}} """ mqtt_host, mqtt_port = get_mqtt_settings() topic = f"home/{device_type}/{device_id}/set" message = { "type": device_type, "payload": payload } try: async with Client(hostname=mqtt_host, port=mqtt_port) as client: await client.publish( topic=topic, payload=json.dumps(message), qos=1 ) logger.info(f"Published to {topic}: {message}") except Exception as e: logger.error(f"Failed to publish to {topic}: {e}") raise def get_redis_settings() -> tuple[str, str]: """Get Redis settings from configuration. Prioritizes environment variables over config file: - REDIS_HOST, REDIS_PORT, REDIS_DB → redis://host:port/db - REDIS_CHANNEL → pub/sub channel name Returns: tuple: (url, channel) """ # Check environment variables first redis_host = os.getenv("REDIS_HOST") redis_port = os.getenv("REDIS_PORT", "6379") redis_db = os.getenv("REDIS_DB", "0") redis_channel = os.getenv("REDIS_CHANNEL", "ui:updates") if redis_host: url = f"redis://{redis_host}:{redis_port}/{redis_db}" return url, redis_channel # Fallback to config file config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml" if config_path.exists(): with open(config_path, "r") as f: config = yaml.safe_load(f) redis_config = config.get("redis", {}) url = redis_config.get("url", "redis://localhost:6379/0") channel = redis_config.get("channel", "ui:updates") return url, channel return "redis://localhost:6379/0", "ui:updates" async def publish_mqtt(topic: str, payload: dict[str, Any]) -> None: """Publish message to MQTT broker. Args: topic: MQTT topic to publish to payload: Message payload """ host, port = get_mqtt_settings() message = json.dumps(payload) async with Client(hostname=host, port=port, identifier="home-automation-api") as client: await client.publish(topic, message, qos=1) @app.get("/devices") async def get_devices() -> list[DeviceInfo]: """Get list of available devices. Returns: list: List of device information including features """ devices = load_devices() return [ DeviceInfo( device_id=device["device_id"], type=device["type"], name=device.get("name", device["device_id"]), features=device.get("features", {}) ) for device in devices ] @app.get("/devices/states") async def get_device_states() -> dict[str, dict[str, Any]]: """Get current states of all devices from in-memory cache. Returns: dict: Dictionary mapping device_id to state payload """ return device_states @app.get("/layout") async def get_layout() -> dict[str, Any]: """Get UI layout configuration. Returns: dict: Layout configuration with rooms and device tiles """ try: layout = load_layout() # Convert Pydantic models to dict rooms = [] for room in layout.rooms: devices = [] for tile in room.devices: devices.append({ "device_id": tile.device_id, "title": tile.title, "icon": tile.icon, "rank": tile.rank }) rooms.append({ "name": room.name, "devices": devices }) return {"rooms": rooms} except Exception as e: logger.error(f"Error loading layout: {e}") # Return empty layout on error return {"rooms": []} @app.get("/devices/{device_id}/room") async def get_device_room(device_id: str) -> dict[str, str | None]: """Get the room name for a specific device. Args: device_id: Device identifier Returns: dict: {"device_id": str, "room": str | null} """ room = get_room(device_id) return { "device_id": device_id, "room": room } @app.post("/devices/{device_id}/set", status_code=status.HTTP_202_ACCEPTED) async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str]: """Set device state. Args: device_id: Device identifier request: Device state request Returns: dict: Confirmation message Raises: HTTPException: If device not found or payload invalid """ # Load devices and check if device exists devices = load_devices() device = next((d for d in devices if d["device_id"] == device_id), None) if not device: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Device {device_id} not found" ) # Check if device is read-only (contact sensors, etc.) if "topics" in device and "set" not in device["topics"]: raise HTTPException( status_code=status.HTTP_405_METHOD_NOT_ALLOWED, detail="Device is read-only" ) # Validate payload based on device type if request.type == "light": try: LightState(**request.payload) except ValidationError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid payload for light: {e}" ) elif request.type == "relay": try: RelayState(**request.payload) except ValidationError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid payload for relay: {e}" ) elif request.type == "thermostat": try: # For thermostat SET: only allow mode and target allowed_set_fields = {"mode", "target"} invalid_fields = set(request.payload.keys()) - allowed_set_fields if invalid_fields: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Thermostat SET only allows {allowed_set_fields}, got invalid fields: {invalid_fields}" ) ThermostatState(**request.payload) except ValidationError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid payload for thermostat: {e}" ) elif request.type in {"contact", "contact_sensor"}: # Contact sensors are read-only raise HTTPException( status_code=status.HTTP_405_METHOD_NOT_ALLOWED, detail="Contact sensors are read-only devices" ) elif request.type in {"temp_humidity", "temp_humidity_sensor"}: # Temperature & humidity sensors are read-only raise HTTPException( status_code=status.HTTP_405_METHOD_NOT_ALLOWED, detail="Temperature & humidity sensors are read-only devices" ) else: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unsupported device type: {request.type}" ) # Publish to MQTT topic = f"home/{request.type}/{device_id}/set" mqtt_payload = { "type": request.type, "payload": request.payload } await publish_mqtt(topic, mqtt_payload) return {"message": f"Command sent to {device_id}"} async def event_generator(request: Request) -> AsyncGenerator[str, None]: """Generate SSE events from Redis Pub/Sub with Safari compatibility. Safari-compatible features: - Immediate retry hint on connection - Regular heartbeats every 15s (comment-only, no data) - Proper flushing after each yield - Graceful disconnect handling Args: request: FastAPI request object for disconnect detection Yields: str: SSE formatted event strings """ redis_client = None pubsub = None try: # Send retry hint immediately for EventSource reconnect behavior yield "retry: 2500\n\n" # Try to connect to Redis redis_url, redis_channel = get_redis_settings() try: redis_client = await aioredis.from_url(redis_url, decode_responses=True) pubsub = redis_client.pubsub() await pubsub.subscribe(redis_channel) logger.info(f"SSE client connected, subscribed to {redis_channel}") except Exception as e: logger.warning(f"Redis unavailable, running in heartbeat-only mode: {e}") redis_client = None pubsub = None # Heartbeat tracking last_heartbeat = asyncio.get_event_loop().time() heartbeat_interval = 15 # Safari-friendly: shorter interval while True: # Check if client disconnected if await request.is_disconnected(): logger.info("SSE client disconnected") break # Try to get message from Redis (if available) if pubsub: try: message = await asyncio.wait_for( pubsub.get_message(ignore_subscribe_messages=True), timeout=0.1 ) if message and message["type"] == "message": data = message["data"] logger.debug(f"Sending SSE message: {data[:100]}...") # Update in-memory cache with latest state try: state_data = json.loads(data) if state_data.get("type") == "state" and state_data.get("device_id"): device_states[state_data["device_id"]] = state_data.get("payload", {}) except Exception as e: logger.warning(f"Failed to parse state data for cache: {e}") yield f"event: message\ndata: {data}\n\n" last_heartbeat = asyncio.get_event_loop().time() continue # Skip sleep, check for more messages immediately except asyncio.TimeoutError: pass # No message, continue to heartbeat check except Exception as e: logger.error(f"Redis error: {e}") # Continue with heartbeats even if Redis fails # Sleep briefly to avoid busy loop await asyncio.sleep(0.1) # Send heartbeat if interval elapsed current_time = asyncio.get_event_loop().time() if current_time - last_heartbeat >= heartbeat_interval: # Comment-style ping (Safari-compatible, no event type) yield ": ping\n\n" last_heartbeat = current_time except asyncio.CancelledError: logger.info("SSE connection cancelled by client") raise except Exception as e: logger.error(f"SSE error: {e}") raise finally: # Cleanup Redis connection if pubsub: try: await pubsub.unsubscribe(redis_channel) await pubsub.aclose() except Exception as e: logger.error(f"Error closing pubsub: {e}") if redis_client: try: await redis_client.aclose() except Exception as e: logger.error(f"Error closing redis: {e}") logger.info("SSE connection closed") @app.get("/realtime") async def realtime_events(request: Request) -> StreamingResponse: """Server-Sent Events endpoint for real-time updates. Safari-compatible SSE implementation: - Immediate retry hint (2.5s reconnect delay) - Heartbeat every 15s using comment syntax ": ping" - Proper Cache-Control headers - No buffering (nginx compatibility) - Graceful Redis fallback (heartbeat-only mode) Args: request: FastAPI request object Returns: StreamingResponse: SSE stream with Redis messages and heartbeats """ return StreamingResponse( event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering } ) def main() -> None: """Run the API application with uvicorn.""" import uvicorn uvicorn.run( "apps.api.main:app", host="0.0.0.0", port=8001, reload=True ) if __name__ == "__main__": main()