diff --git a/apps/api/main.py b/apps/api/main.py index b3c0664..2c695a1 100644 --- a/apps/api/main.py +++ b/apps/api/main.py @@ -19,6 +19,13 @@ from packages.home_capabilities import LIGHT_VERSION, THERMOSTAT_VERSION, LightS logger = logging.getLogger(__name__) +# In-memory cache for last known device states +# Will be populated from Redis pub/sub messages +device_states: dict[str, dict[str, Any]] = {} + +# Background task reference +background_task: asyncio.Task | None = None + app = FastAPI( title="Home Automation API", description="API for home automation system", @@ -49,6 +56,77 @@ async def health() -> dict[str, str]: return {"status": "ok"} +async def redis_state_listener(): + """Background task that listens to Redis pub/sub and updates state cache.""" + redis_client = None + pubsub = None + + try: + redis_url, redis_channel = get_redis_settings() + logger.info(f"Starting Redis state listener for channel {redis_channel}") + + redis_client = await aioredis.from_url(redis_url, decode_responses=True) + pubsub = redis_client.pubsub() + await pubsub.subscribe(redis_channel) + + logger.info("Redis state listener connected") + + while True: + try: + message = await asyncio.wait_for( + pubsub.get_message(ignore_subscribe_messages=True), + timeout=1.0 + ) + + if message and message["type"] == "message": + data = message["data"] + try: + state_data = json.loads(data) + if state_data.get("type") == "state" and state_data.get("device_id"): + device_id = state_data["device_id"] + payload = state_data.get("payload", {}) + device_states[device_id] = payload + logger.debug(f"Updated state cache for {device_id}: {payload}") + except Exception as e: + logger.warning(f"Failed to parse state data: {e}") + + except asyncio.TimeoutError: + pass # No message, continue + + except asyncio.CancelledError: + logger.info("Redis state listener cancelled") + raise + except Exception as e: + logger.error(f"Redis state listener error: {e}") + finally: + if pubsub: + await pubsub.unsubscribe(redis_channel) + await pubsub.close() + if redis_client: + await redis_client.close() + + +@app.on_event("startup") +async def startup_event(): + """Start background tasks on application startup.""" + global background_task + background_task = asyncio.create_task(redis_state_listener()) + logger.info("Started background Redis state listener") + + +@app.on_event("shutdown") +async def shutdown_event(): + """Clean up background tasks on application shutdown.""" + global background_task + if background_task: + background_task.cancel() + try: + await background_task + except asyncio.CancelledError: + pass + logger.info("Stopped background Redis state listener") + + @app.get("/spec") async def spec() -> dict[str, dict[str, str]]: """Capability specification endpoint. @@ -182,6 +260,16 @@ async def get_devices() -> list[DeviceInfo]: ] +@app.get("/devices/states") +async def get_device_states() -> dict[str, dict[str, Any]]: + """Get current states of all devices from in-memory cache. + + Returns: + dict: Dictionary mapping device_id to state payload + """ + return device_states + + @app.get("/layout") async def get_layout() -> dict[str, Any]: """Get UI layout configuration. @@ -341,6 +429,15 @@ async def event_generator(request: Request) -> AsyncGenerator[str, None]: if message and message["type"] == "message": data = message["data"] logger.debug(f"Sending SSE message: {data[:100]}...") + + # Update in-memory cache with latest state + try: + state_data = json.loads(data) + if state_data.get("type") == "state" and state_data.get("device_id"): + device_states[state_data["device_id"]] = state_data.get("payload", {}) + except Exception as e: + logger.warning(f"Failed to parse state data for cache: {e}") + yield f"event: message\ndata: {data}\n\n" last_heartbeat = asyncio.get_event_loop().time() continue # Skip sleep, check for more messages immediately diff --git a/apps/ui/templates/dashboard.html b/apps/ui/templates/dashboard.html index 5b33feb..f066f25 100644 --- a/apps/ui/templates/dashboard.html +++ b/apps/ui/templates/dashboard.html @@ -1161,21 +1161,23 @@ // Load initial device states async function loadDevices() { try { - const response = await fetch(api('/devices')); - const devices = await response.json(); - console.log('Loaded initial device states:', devices); + const response = await fetch(api('/devices/states')); + const states = await response.json(); + console.log('Loaded initial device states:', states); // Update UI with initial states - devices.forEach(device => { - if (device.type === 'light' && device.state) { - currentState[device.id] = device.state.power; - updateDeviceUI(device.id, device.state.power, device.state.brightness); - } else if (device.type === 'thermostat' && device.state) { - if (device.state.mode) thermostatModes[device.id] = device.state.mode; - if (device.state.target) thermostatTargets[device.id] = device.state.target; - updateThermostatUI(device.id, device.state.current, device.state.target, device.state.mode); + for (const [deviceId, state] of Object.entries(states)) { + if (state.power !== undefined) { + // It's a light + currentState[deviceId] = state.power; + updateDeviceUI(deviceId, state.power, state.brightness); + } else if (state.mode !== undefined || state.target !== undefined) { + // It's a thermostat + if (state.mode) thermostatModes[deviceId] = state.mode; + if (state.target) thermostatTargets[deviceId] = state.target; + updateThermostatUI(deviceId, state.current, state.target, state.mode); } - }); + } } catch (error) { console.error('Failed to load initial device states:', error); }