"""API main entry point.""" import asyncio import json import logging import os from pathlib import Path from typing import Any, AsyncGenerator import redis.asyncio as aioredis import yaml from aiomqtt import Client from fastapi import FastAPI, HTTPException, Request, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, ValidationError from packages.home_capabilities import LIGHT_VERSION, THERMOSTAT_VERSION, LightState, ThermostatState logger = logging.getLogger(__name__) app = FastAPI( title="Home Automation API", description="API for home automation system", version="0.1.0" ) # Configure CORS for localhost (Frontend) app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:8002", "http://172.19.1.11:8002", "http://127.0.0.1:8002", ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/health") async def health() -> dict[str, str]: """Health check endpoint. Returns: dict: Status indicating the service is healthy """ return {"status": "ok"} @app.get("/spec") async def spec() -> dict[str, dict[str, str]]: """Capability specification endpoint. Returns: dict: Dictionary containing supported capabilities and their versions """ return { "capabilities": { "light": LIGHT_VERSION, "thermostat": THERMOSTAT_VERSION } } # Pydantic Models class SetDeviceRequest(BaseModel): """Request model for setting device state.""" type: str payload: dict[str, Any] class DeviceInfo(BaseModel): """Device information model.""" device_id: str type: str name: str features: dict[str, Any] = {} # Configuration helpers def load_devices() -> list[dict[str, Any]]: """Load devices from configuration file. Returns: list: List of device configurations """ config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml" if not config_path.exists(): return [] with open(config_path, "r") as f: config = yaml.safe_load(f) # Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally devices = config.get("devices", []) for device in devices: device["device_id"] = device.pop("device_id", device.pop("id", None)) return devices def get_mqtt_settings() -> tuple[str, int]: """Get MQTT broker settings from environment. Supports both MQTT_BROKER and MQTT_HOST for compatibility. Returns: tuple: (host, port) """ host = os.environ.get("MQTT_BROKER") or os.environ.get("MQTT_HOST", "172.16.2.16") port = int(os.environ.get("MQTT_PORT", "1883")) return host, port def get_redis_settings() -> tuple[str, str]: """Get Redis settings from configuration. Prioritizes environment variables over config file: - REDIS_HOST, REDIS_PORT, REDIS_DB → redis://host:port/db - REDIS_CHANNEL → pub/sub channel name Returns: tuple: (url, channel) """ # Check environment variables first redis_host = os.getenv("REDIS_HOST") redis_port = os.getenv("REDIS_PORT", "6379") redis_db = os.getenv("REDIS_DB", "0") redis_channel = os.getenv("REDIS_CHANNEL", "ui:updates") if redis_host: url = f"redis://{redis_host}:{redis_port}/{redis_db}" return url, redis_channel # Fallback to config file config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml" if config_path.exists(): with open(config_path, "r") as f: config = yaml.safe_load(f) redis_config = config.get("redis", {}) url = redis_config.get("url", "redis://localhost:6379/0") channel = redis_config.get("channel", "ui:updates") return url, channel return "redis://localhost:6379/0", "ui:updates" async def publish_mqtt(topic: str, payload: dict[str, Any]) -> None: """Publish message to MQTT broker. Args: topic: MQTT topic to publish to payload: Message payload """ host, port = get_mqtt_settings() message = json.dumps(payload) async with Client(hostname=host, port=port, identifier="home-automation-api") as client: await client.publish(topic, message, qos=1) @app.get("/devices") async def get_devices() -> list[DeviceInfo]: """Get list of available devices. Returns: list: List of device information including features """ devices = load_devices() return [ DeviceInfo( device_id=device["device_id"], type=device["type"], name=device.get("name", device["device_id"]), features=device.get("features", {}) ) for device in devices ] @app.get("/layout") async def get_layout() -> dict[str, Any]: """Get UI layout configuration. Returns: dict: Layout configuration with rooms and device tiles """ from packages.home_capabilities import load_layout try: layout = load_layout() # Convert Pydantic models to dict rooms = [] for room in layout.rooms: devices = [] for tile in room.devices: devices.append({ "device_id": tile.device_id, "title": tile.title, "icon": tile.icon, "rank": tile.rank }) rooms.append({ "name": room.name, "devices": devices }) return {"rooms": rooms} except Exception as e: logger.error(f"Error loading layout: {e}") # Return empty layout on error return {"rooms": []} @app.post("/devices/{device_id}/set", status_code=status.HTTP_202_ACCEPTED) async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str]: """Set device state. Args: device_id: Device identifier request: Device state request Returns: dict: Confirmation message Raises: HTTPException: If device not found or payload invalid """ # Load devices and check if device exists devices = load_devices() device = next((d for d in devices if d["device_id"] == device_id), None) if not device: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Device {device_id} not found" ) # Validate payload based on device type if request.type == "light": try: LightState(**request.payload) except ValidationError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid payload for light: {e}" ) elif request.type == "thermostat": try: # For thermostat SET: only allow mode and target allowed_set_fields = {"mode", "target"} invalid_fields = set(request.payload.keys()) - allowed_set_fields if invalid_fields: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Thermostat SET only allows {allowed_set_fields}, got invalid fields: {invalid_fields}" ) ThermostatState(**request.payload) except ValidationError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid payload for thermostat: {e}" ) else: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unsupported device type: {request.type}" ) # Publish to MQTT topic = f"home/{request.type}/{device_id}/set" mqtt_payload = { "type": request.type, "payload": request.payload } await publish_mqtt(topic, mqtt_payload) return {"message": f"Command sent to {device_id}"} async def event_generator(request: Request) -> AsyncGenerator[str, None]: """Generate SSE events from Redis Pub/Sub. Args: request: FastAPI request object for disconnect detection Yields: str: SSE formatted event strings """ redis_url, redis_channel = get_redis_settings() redis_client = await aioredis.from_url(redis_url, decode_responses=True) pubsub = redis_client.pubsub() try: await pubsub.subscribe(redis_channel) logger.info(f"SSE client connected, subscribed to {redis_channel}") # Create heartbeat tracking last_heartbeat = asyncio.get_event_loop().time() heartbeat_interval = 25 while True: # Check if client disconnected if await request.is_disconnected(): logger.info("SSE client disconnected") break # Try to get message (non-blocking) message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1) # Handle actual data messages if message and message["type"] == "message": data = message["data"] logger.debug(f"Sending SSE message: {data[:100]}...") yield f"event: message\ndata: {data}\n\n" last_heartbeat = asyncio.get_event_loop().time() else: # No message, sleep a bit to avoid busy loop await asyncio.sleep(0.1) # Send heartbeat every 25 seconds current_time = asyncio.get_event_loop().time() if current_time - last_heartbeat >= heartbeat_interval: yield "event: ping\ndata: heartbeat\n\n" last_heartbeat = current_time finally: await pubsub.unsubscribe(redis_channel) await pubsub.aclose() await redis_client.aclose() logger.info("SSE connection closed") @app.get("/realtime") async def realtime_events(request: Request) -> StreamingResponse: """Server-Sent Events endpoint for real-time updates. Args: request: FastAPI request object Returns: StreamingResponse: SSE stream of Redis messages """ return StreamingResponse( event_generator(request), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering } ) return {"message": f"Command sent to {device_id}"} def main() -> None: """Run the API application with uvicorn.""" import uvicorn uvicorn.run( "apps.api.main:app", host="0.0.0.0", port=8001, reload=True ) if __name__ == "__main__": main()