858 lines
28 KiB
Python
858 lines
28 KiB
Python
@app.get("/devices/{device_id}/layout")
|
|
async def get_device_layout(device_id: str):
|
|
"""Gibt die layout-spezifischen Informationen für ein einzelnes Gerät zurück."""
|
|
layout = load_layout()
|
|
for room in layout.get("rooms", []):
|
|
for device in room.get("devices", []):
|
|
if device.get("device_id") == device_id:
|
|
# Rückgabe: Layout-Infos + Raumname
|
|
return {
|
|
"device_id": device_id,
|
|
"room": room.get("name"),
|
|
"title": device.get("title"),
|
|
"icon": device.get("icon"),
|
|
"rank": device.get("rank"),
|
|
}
|
|
raise HTTPException(status_code=404, detail="Device layout not found")
|
|
|
|
@app.get("/devices/{device_id}/state")
|
|
async def get_device_state(device_id: str):
|
|
"""Gibt den aktuellen State für ein einzelnes Gerät zurück."""
|
|
state_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
|
|
if not state_path.exists():
|
|
raise HTTPException(status_code=500, detail="State file not found")
|
|
with open(state_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
states = config.get("states", {})
|
|
state = states.get(device_id)
|
|
if state is None:
|
|
raise HTTPException(status_code=404, detail="Device state not found")
|
|
return state
|
|
# --- Minimal-invasive: Einzelgerät-Layout-Endpunkt ---
|
|
from fastapi import Query
|
|
|
|
|
|
|
|
# --- Minimal-invasive: Einzelgerät-Layout-Endpunkt ---
|
|
@app.get("/devices/{device_id}/layout")
|
|
async def get_device_layout(device_id: str):
|
|
"""Gibt die layout-spezifischen Informationen für ein einzelnes Gerät zurück."""
|
|
layout = load_layout()
|
|
for room in layout.get("rooms", []):
|
|
for device in room.get("devices", []):
|
|
if device.get("device_id") == device_id:
|
|
# Rückgabe: Layout-Infos + Raumname
|
|
return {
|
|
"device_id": device_id,
|
|
"room": room.get("name"),
|
|
"title": device.get("title"),
|
|
"icon": device.get("icon"),
|
|
"rank": device.get("rank"),
|
|
}
|
|
raise HTTPException(status_code=404, detail="Device layout not found")
|
|
|
|
# --- Minimal-invasive: Einzelgerät-State-Endpunkt ---
|
|
@app.get("/devices/{device_id}/state")
|
|
async def get_device_state(device_id: str):
|
|
"""Gibt den aktuellen State für ein einzelnes Gerät zurück."""
|
|
# States werden wie im Bulk-Endpoint geladen
|
|
state_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
|
|
if not state_path.exists():
|
|
raise HTTPException(status_code=500, detail="State file not found")
|
|
with open(state_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
states = config.get("states", {})
|
|
state = states.get(device_id)
|
|
if state is None:
|
|
raise HTTPException(status_code=404, detail="Device state not found")
|
|
return state
|
|
"""API main entry point.
|
|
|
|
API-Analyse für HomeKit-Bridge Kompatibilität
|
|
==============================================
|
|
|
|
1) GET /devices
|
|
Status: ✅ VORHANDEN (Zeile 325-343)
|
|
|
|
Aktuelles Response-Modell (DeviceInfo, Zeile 189-194):
|
|
{
|
|
"device_id": str, ✅ OK
|
|
"type": str, ✅ OK
|
|
"name": str, ⚠️ ABWEICHUNG: Erwartet wurde "short_name" (optional)
|
|
"features": dict ✅ OK
|
|
}
|
|
|
|
Bewertung:
|
|
- ✅ Liefert device_id, type, features wie erwartet
|
|
- ⚠️ Verwendet "name" statt "short_name"
|
|
- ✅ Fallback auf device_id wenn name nicht vorhanden
|
|
- Kompatibilität: HOCH - einfach "name" als "short_name" verwenden
|
|
|
|
|
|
2) GET /layout
|
|
Status: ✅ VORHANDEN (Zeile 354-387)
|
|
|
|
Aktuelles Response-Format:
|
|
{
|
|
"rooms": [
|
|
{
|
|
"name": "Schlafzimmer",
|
|
"devices": [
|
|
{
|
|
"device_id": "thermostat_wolfgang",
|
|
"title": "Thermostat Wolfgang", ← friendly_name
|
|
"icon": "thermometer",
|
|
"rank": 1
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
Mapping device_id -> room, friendly_name:
|
|
- room: Durch Iteration über rooms[].devices[] ableitbar
|
|
- friendly_name: Im Feld "title" enthalten
|
|
|
|
Bewertung:
|
|
- ✅ Alle erforderlichen Informationen vorhanden
|
|
- ⚠️ ABWEICHUNG: Verschachtelte Struktur (rooms -> devices)
|
|
- ⚠️ ABWEICHUNG: friendly_name heißt "title"
|
|
- Kompatibilität: HOCH - einfache Transformation möglich:
|
|
```python
|
|
for room in layout["rooms"]:
|
|
for device in room["devices"]:
|
|
mapping[device["device_id"]] = {
|
|
"room": room["name"],
|
|
"friendly_name": device["title"]
|
|
}
|
|
```
|
|
|
|
|
|
3) POST /devices/{device_id}/set
|
|
Status: ✅ VORHANDEN (Zeile 406-504)
|
|
|
|
Aktuelles Request-Modell (SetDeviceRequest, Zeile 182-185):
|
|
{
|
|
"type": str, ✅ OK - muss zum Gerätetyp passen
|
|
"payload": dict ✅ OK - abstraktes Kommando
|
|
}
|
|
|
|
Beispiel Light:
|
|
POST /devices/leselampe_esszimmer/set
|
|
{"type": "light", "payload": {"power": "on", "brightness": 80}}
|
|
|
|
Beispiel Thermostat:
|
|
POST /devices/thermostat_wolfgang/set
|
|
{"type": "thermostat", "payload": {"target": 21.0}}
|
|
|
|
Validierung:
|
|
- ✅ Type-spezifische Payload-Validierung (Zeile 437-487)
|
|
- ✅ Read-only Check → 405 METHOD_NOT_ALLOWED (Zeile 431-435)
|
|
- ✅ Ungültige Payload → 422 UNPROCESSABLE_ENTITY
|
|
- ✅ Device nicht gefunden → 404 NOT_FOUND
|
|
|
|
Bewertung:
|
|
- ✅ Exakt wie erwartet implementiert
|
|
- ✅ Alle geforderten Error Codes vorhanden
|
|
- Kompatibilität: PERFEKT
|
|
|
|
|
|
4) Realtime-Endpoint (SSE)
|
|
Status: ✅ VORHANDEN als GET /realtime (Zeile 608-632)
|
|
|
|
Implementierung:
|
|
- ✅ Server-Sent Events (media_type="text/event-stream")
|
|
- ✅ Redis Pub/Sub basiert (event_generator, Zeile 510-607)
|
|
- ✅ Safari-kompatibel (Heartbeats, Retry-Hints)
|
|
|
|
Aktuelles Event-Format (aus apps/abstraction/main.py:250-256):
|
|
{
|
|
"type": "state", ✅ OK
|
|
"device_id": str, ✅ OK
|
|
"payload": dict, ✅ OK - z.B. {"power":"on","brightness":80}
|
|
"ts": str ✅ OK - ISO-8601 format von datetime.now(timezone.utc)
|
|
}
|
|
|
|
Beispiel-Event:
|
|
{
|
|
"type": "state",
|
|
"device_id": "thermostat_wolfgang",
|
|
"payload": {"current": 19.5, "target": 21.0},
|
|
"ts": "2025-11-17T14:23:45.123456+00:00"
|
|
}
|
|
|
|
Bewertung:
|
|
- ✅ Alle geforderten Felder vorhanden
|
|
- ✅ Timestamp im korrekten Format
|
|
- ✅ SSE mit proper headers und error handling
|
|
- Kompatibilität: PERFEKT
|
|
|
|
|
|
ZUSAMMENFASSUNG
|
|
===============
|
|
|
|
Alle 4 geforderten Endpunkte sind implementiert!
|
|
|
|
Kompatibilität mit HomeKit-Bridge Anforderungen:
|
|
- GET /devices: HOCH (nur Name-Feld unterschiedlich)
|
|
- GET /layout: HOCH (Struktur-Transformation nötig)
|
|
- POST /devices/{id}/set: PERFEKT (1:1 wie gefordert)
|
|
- GET /realtime (SSE): PERFEKT (1:1 wie gefordert)
|
|
|
|
Erforderliche Anpassungen für Bridge:
|
|
1. GET /devices: "name" als "short_name" interpretieren ✓ trivial
|
|
2. GET /layout: Verschachtelte Struktur zu flat mapping umwandeln ✓ einfach
|
|
|
|
Keine Code-Änderungen in der API erforderlich!
|
|
Die Bridge kann die bestehenden Endpoints direkt nutzen.
|
|
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, AsyncGenerator
|
|
|
|
import redis.asyncio as aioredis
|
|
import yaml
|
|
from aiomqtt import Client
|
|
from fastapi import FastAPI, HTTPException, Request, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel, ValidationError
|
|
|
|
from packages.home_capabilities import (
|
|
LIGHT_VERSION,
|
|
THERMOSTAT_VERSION,
|
|
CONTACT_SENSOR_VERSION,
|
|
TEMP_HUMIDITY_SENSOR_VERSION,
|
|
RELAY_VERSION,
|
|
LightState,
|
|
ThermostatState,
|
|
ContactState,
|
|
TempHumidityState,
|
|
RelayState,
|
|
load_layout,
|
|
)
|
|
|
|
# Import resolvers (must be before router imports to avoid circular dependency)
|
|
from apps.api.resolvers import (
|
|
DeviceDTO,
|
|
resolve_group_devices,
|
|
resolve_scene_step_devices,
|
|
load_device_rooms,
|
|
get_room,
|
|
clear_room_cache,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# STATE CACHES
|
|
# ============================================================================
|
|
|
|
# In-memory cache for last known device states
|
|
# Will be populated from Redis pub/sub messages
|
|
device_states: dict[str, dict[str, Any]] = {}
|
|
|
|
# Background task reference
|
|
background_task: asyncio.Task | None = None
|
|
|
|
app = FastAPI(
|
|
title="Home Automation API",
|
|
description="API for home automation system",
|
|
version="0.1.0"
|
|
)
|
|
|
|
# Configure CORS for localhost (Frontend)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[
|
|
"http://localhost:8002",
|
|
"http://172.19.1.11:8002",
|
|
"http://127.0.0.1:8002",
|
|
],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Include routers after app is initialized to avoid circular imports."""
|
|
from apps.api.routes.groups_scenes import router as groups_scenes_router
|
|
app.include_router(groups_scenes_router, prefix="")
|
|
|
|
|
|
@app.get("/health")
|
|
async def health() -> dict[str, str]:
|
|
"""Health check endpoint.
|
|
|
|
Returns:
|
|
dict: Status indicating the service is healthy
|
|
"""
|
|
return {"status": "ok"}
|
|
|
|
|
|
async def redis_state_listener():
|
|
"""Background task that listens to Redis pub/sub and updates state cache."""
|
|
redis_client = None
|
|
pubsub = None
|
|
|
|
try:
|
|
redis_url, redis_channel = get_redis_settings()
|
|
logger.info(f"Starting Redis state listener for channel {redis_channel}")
|
|
|
|
redis_client = await aioredis.from_url(redis_url, decode_responses=True)
|
|
pubsub = redis_client.pubsub()
|
|
await pubsub.subscribe(redis_channel)
|
|
|
|
logger.info("Redis state listener connected")
|
|
|
|
while True:
|
|
try:
|
|
message = await asyncio.wait_for(
|
|
pubsub.get_message(ignore_subscribe_messages=True),
|
|
timeout=1.0
|
|
)
|
|
|
|
if message and message["type"] == "message":
|
|
data = message["data"]
|
|
try:
|
|
state_data = json.loads(data)
|
|
if state_data.get("type") == "state" and state_data.get("device_id"):
|
|
device_id = state_data["device_id"]
|
|
payload = state_data.get("payload", {})
|
|
device_states[device_id] = payload
|
|
logger.debug(f"Updated state cache for {device_id}: {payload}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse state data: {e}")
|
|
|
|
except asyncio.TimeoutError:
|
|
pass # No message, continue
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Redis state listener cancelled")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Redis state listener error: {e}")
|
|
finally:
|
|
if pubsub:
|
|
await pubsub.unsubscribe(redis_channel)
|
|
await pubsub.close()
|
|
if redis_client:
|
|
await redis_client.close()
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Start background tasks on application startup."""
|
|
global background_task
|
|
background_task = asyncio.create_task(redis_state_listener())
|
|
logger.info("Started background Redis state listener")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Clean up background tasks on application shutdown."""
|
|
global background_task
|
|
if background_task:
|
|
background_task.cancel()
|
|
try:
|
|
await background_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("Stopped background Redis state listener")
|
|
|
|
|
|
@app.get("/spec")
|
|
async def spec() -> dict[str, dict[str, str]]:
|
|
"""Capability specification endpoint.
|
|
|
|
Returns:
|
|
dict: Dictionary containing supported capabilities and their versions
|
|
"""
|
|
return {
|
|
"capabilities": {
|
|
"light": LIGHT_VERSION,
|
|
"thermostat": THERMOSTAT_VERSION,
|
|
"contact": CONTACT_SENSOR_VERSION,
|
|
"temp_humidity": TEMP_HUMIDITY_SENSOR_VERSION,
|
|
"relay": RELAY_VERSION
|
|
}
|
|
}
|
|
|
|
|
|
# Pydantic Models
|
|
class SetDeviceRequest(BaseModel):
|
|
"""Request model for setting device state."""
|
|
type: str
|
|
payload: dict[str, Any]
|
|
|
|
|
|
class DeviceInfo(BaseModel):
|
|
"""Device information model."""
|
|
device_id: str
|
|
type: str
|
|
name: str
|
|
features: dict[str, Any] = {}
|
|
|
|
|
|
# Configuration helpers
|
|
def load_devices() -> list[dict[str, Any]]:
|
|
"""Load devices from configuration file.
|
|
|
|
Returns:
|
|
list: List of device configurations
|
|
"""
|
|
config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
|
|
|
|
if not config_path.exists():
|
|
return []
|
|
|
|
with open(config_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
# Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally
|
|
devices = config.get("devices", [])
|
|
for device in devices:
|
|
device["device_id"] = device.pop("device_id", device.pop("id", None))
|
|
|
|
return devices
|
|
|
|
|
|
def get_mqtt_settings() -> tuple[str, int]:
|
|
"""Get MQTT broker settings from environment.
|
|
|
|
Supports both MQTT_BROKER and MQTT_HOST for compatibility.
|
|
|
|
Returns:
|
|
tuple: (host, port)
|
|
"""
|
|
host = os.environ.get("MQTT_BROKER") or os.environ.get("MQTT_HOST", "172.16.2.16")
|
|
port = int(os.environ.get("MQTT_PORT", "1883"))
|
|
return host, port
|
|
|
|
|
|
# ============================================================================
|
|
# MQTT PUBLISH
|
|
# ============================================================================
|
|
|
|
async def publish_abstract_set(device_type: str, device_id: str, payload: dict[str, Any]) -> None:
|
|
"""
|
|
Publish an abstract set command via MQTT.
|
|
|
|
This function encapsulates MQTT publishing logic so that group/scene
|
|
execution doesn't need to know MQTT topic details.
|
|
|
|
Topic format: home/{device_type}/{device_id}/set
|
|
Message format: {"type": device_type, "payload": payload}
|
|
|
|
Args:
|
|
device_type: Device type (light, thermostat, relay, etc.)
|
|
device_id: Device identifier
|
|
payload: Command payload (e.g., {"power": "on", "brightness": 50})
|
|
|
|
Example:
|
|
>>> await publish_abstract_set("light", "kueche_deckenlampe", {"power": "on", "brightness": 35})
|
|
# Publishes to: home/light/kueche_deckenlampe/set
|
|
# Message: {"type": "light", "payload": {"power": "on", "brightness": 35}}
|
|
"""
|
|
mqtt_host, mqtt_port = get_mqtt_settings()
|
|
topic = f"home/{device_type}/{device_id}/set"
|
|
message = {
|
|
"type": device_type,
|
|
"payload": payload
|
|
}
|
|
|
|
try:
|
|
async with Client(hostname=mqtt_host, port=mqtt_port) as client:
|
|
await client.publish(
|
|
topic=topic,
|
|
payload=json.dumps(message),
|
|
qos=1
|
|
)
|
|
logger.info(f"Published to {topic}: {message}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish to {topic}: {e}")
|
|
raise
|
|
|
|
|
|
def get_redis_settings() -> tuple[str, str]:
|
|
"""Get Redis settings from configuration.
|
|
|
|
Prioritizes environment variables over config file:
|
|
- REDIS_HOST, REDIS_PORT, REDIS_DB → redis://host:port/db
|
|
- REDIS_CHANNEL → pub/sub channel name
|
|
|
|
Returns:
|
|
tuple: (url, channel)
|
|
"""
|
|
# Check environment variables first
|
|
redis_host = os.getenv("REDIS_HOST")
|
|
redis_port = os.getenv("REDIS_PORT", "6379")
|
|
redis_db = os.getenv("REDIS_DB", "0")
|
|
redis_channel = os.getenv("REDIS_CHANNEL", "ui:updates")
|
|
|
|
if redis_host:
|
|
url = f"redis://{redis_host}:{redis_port}/{redis_db}"
|
|
return url, redis_channel
|
|
|
|
# Fallback to config file
|
|
config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
|
|
|
|
if config_path.exists():
|
|
with open(config_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
redis_config = config.get("redis", {})
|
|
url = redis_config.get("url", "redis://localhost:6379/0")
|
|
channel = redis_config.get("channel", "ui:updates")
|
|
return url, channel
|
|
|
|
return "redis://localhost:6379/0", "ui:updates"
|
|
|
|
|
|
async def publish_mqtt(topic: str, payload: dict[str, Any]) -> None:
|
|
"""Publish message to MQTT broker.
|
|
|
|
Args:
|
|
topic: MQTT topic to publish to
|
|
payload: Message payload
|
|
"""
|
|
host, port = get_mqtt_settings()
|
|
message = json.dumps(payload)
|
|
|
|
async with Client(hostname=host, port=port, identifier="home-automation-api") as client:
|
|
await client.publish(topic, message, qos=1)
|
|
|
|
|
|
@app.get("/devices")
|
|
async def get_devices() -> list[DeviceInfo]:
|
|
"""Get list of available devices.
|
|
|
|
Returns:
|
|
list: List of device information including features
|
|
"""
|
|
devices = load_devices()
|
|
return [
|
|
DeviceInfo(
|
|
device_id=device["device_id"],
|
|
type=device["type"],
|
|
name=device.get("name", device["device_id"]),
|
|
features=device.get("features", {})
|
|
)
|
|
for device in devices
|
|
]
|
|
|
|
|
|
@app.get("/devices/states")
|
|
async def get_device_states() -> dict[str, dict[str, Any]]:
|
|
"""Get current states of all devices from in-memory cache.
|
|
|
|
Returns:
|
|
dict: Dictionary mapping device_id to state payload
|
|
"""
|
|
return device_states
|
|
|
|
|
|
@app.get("/layout")
|
|
async def get_layout() -> dict[str, Any]:
|
|
"""Get UI layout configuration.
|
|
|
|
Returns:
|
|
dict: Layout configuration with rooms and device tiles
|
|
"""
|
|
try:
|
|
layout = load_layout()
|
|
|
|
# Convert Pydantic models to dict
|
|
rooms = []
|
|
for room in layout.rooms:
|
|
devices = []
|
|
for tile in room.devices:
|
|
devices.append({
|
|
"device_id": tile.device_id,
|
|
"title": tile.title,
|
|
"icon": tile.icon,
|
|
"rank": tile.rank
|
|
})
|
|
|
|
rooms.append({
|
|
"name": room.name,
|
|
"devices": devices
|
|
})
|
|
|
|
return {"rooms": rooms}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading layout: {e}")
|
|
# Return empty layout on error
|
|
return {"rooms": []}
|
|
|
|
|
|
@app.get("/devices/{device_id}/room")
|
|
async def get_device_room(device_id: str) -> dict[str, str | None]:
|
|
"""Get the room name for a specific device.
|
|
|
|
Args:
|
|
device_id: Device identifier
|
|
|
|
Returns:
|
|
dict: {"device_id": str, "room": str | null}
|
|
"""
|
|
room = get_room(device_id)
|
|
return {
|
|
"device_id": device_id,
|
|
"room": room
|
|
}
|
|
|
|
|
|
@app.post("/devices/{device_id}/set", status_code=status.HTTP_202_ACCEPTED)
|
|
async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str]:
|
|
"""Set device state.
|
|
|
|
Args:
|
|
device_id: Device identifier
|
|
request: Device state request
|
|
|
|
Returns:
|
|
dict: Confirmation message
|
|
|
|
Raises:
|
|
HTTPException: If device not found or payload invalid
|
|
"""
|
|
# Load devices and check if device exists
|
|
devices = load_devices()
|
|
device = next((d for d in devices if d["device_id"] == device_id), None)
|
|
|
|
if not device:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Device {device_id} not found"
|
|
)
|
|
|
|
# Check if device is read-only (contact sensors, etc.)
|
|
if "topics" in device and "set" not in device["topics"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_405_METHOD_NOT_ALLOWED,
|
|
detail="Device is read-only"
|
|
)
|
|
|
|
# Validate payload based on device type
|
|
if request.type == "light":
|
|
try:
|
|
LightState(**request.payload)
|
|
except ValidationError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Invalid payload for light: {e}"
|
|
)
|
|
elif request.type == "relay":
|
|
try:
|
|
RelayState(**request.payload)
|
|
except ValidationError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Invalid payload for relay: {e}"
|
|
)
|
|
elif request.type == "thermostat":
|
|
try:
|
|
# For thermostat SET: only allow mode and target
|
|
allowed_set_fields = {"mode", "target"}
|
|
invalid_fields = set(request.payload.keys()) - allowed_set_fields
|
|
if invalid_fields:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Thermostat SET only allows {allowed_set_fields}, got invalid fields: {invalid_fields}"
|
|
)
|
|
ThermostatState(**request.payload)
|
|
except ValidationError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Invalid payload for thermostat: {e}"
|
|
)
|
|
elif request.type in {"contact", "contact_sensor"}:
|
|
# Contact sensors are read-only
|
|
raise HTTPException(
|
|
status_code=status.HTTP_405_METHOD_NOT_ALLOWED,
|
|
detail="Contact sensors are read-only devices"
|
|
)
|
|
elif request.type in {"temp_humidity", "temp_humidity_sensor"}:
|
|
# Temperature & humidity sensors are read-only
|
|
raise HTTPException(
|
|
status_code=status.HTTP_405_METHOD_NOT_ALLOWED,
|
|
detail="Temperature & humidity sensors are read-only devices"
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Unsupported device type: {request.type}"
|
|
)
|
|
|
|
# Publish to MQTT
|
|
topic = f"home/{request.type}/{device_id}/set"
|
|
mqtt_payload = {
|
|
"type": request.type,
|
|
"payload": request.payload
|
|
}
|
|
|
|
await publish_mqtt(topic, mqtt_payload)
|
|
|
|
return {"message": f"Command sent to {device_id}"}
|
|
|
|
|
|
async def event_generator(request: Request) -> AsyncGenerator[str, None]:
|
|
"""Generate SSE events from Redis Pub/Sub with Safari compatibility.
|
|
|
|
Safari-compatible features:
|
|
- Immediate retry hint on connection
|
|
- Regular heartbeats every 15s (comment-only, no data)
|
|
- Proper flushing after each yield
|
|
- Graceful disconnect handling
|
|
|
|
Args:
|
|
request: FastAPI request object for disconnect detection
|
|
|
|
Yields:
|
|
str: SSE formatted event strings
|
|
"""
|
|
redis_client = None
|
|
pubsub = None
|
|
|
|
try:
|
|
# Send retry hint immediately for EventSource reconnect behavior
|
|
yield "retry: 2500\n\n"
|
|
|
|
# Try to connect to Redis
|
|
redis_url, redis_channel = get_redis_settings()
|
|
try:
|
|
redis_client = await aioredis.from_url(redis_url, decode_responses=True)
|
|
pubsub = redis_client.pubsub()
|
|
await pubsub.subscribe(redis_channel)
|
|
logger.info(f"SSE client connected, subscribed to {redis_channel}")
|
|
except Exception as e:
|
|
logger.warning(f"Redis unavailable, running in heartbeat-only mode: {e}")
|
|
redis_client = None
|
|
pubsub = None
|
|
|
|
# Heartbeat tracking
|
|
last_heartbeat = asyncio.get_event_loop().time()
|
|
heartbeat_interval = 15 # Safari-friendly: shorter interval
|
|
|
|
while True:
|
|
# Check if client disconnected
|
|
if await request.is_disconnected():
|
|
logger.info("SSE client disconnected")
|
|
break
|
|
|
|
# Try to get message from Redis (if available)
|
|
if pubsub:
|
|
try:
|
|
message = await asyncio.wait_for(
|
|
pubsub.get_message(ignore_subscribe_messages=True),
|
|
timeout=0.1
|
|
)
|
|
|
|
if message and message["type"] == "message":
|
|
data = message["data"]
|
|
logger.debug(f"Sending SSE message: {data[:100]}...")
|
|
|
|
# Update in-memory cache with latest state
|
|
try:
|
|
state_data = json.loads(data)
|
|
if state_data.get("type") == "state" and state_data.get("device_id"):
|
|
device_states[state_data["device_id"]] = state_data.get("payload", {})
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse state data for cache: {e}")
|
|
|
|
yield f"event: message\ndata: {data}\n\n"
|
|
last_heartbeat = asyncio.get_event_loop().time()
|
|
continue # Skip sleep, check for more messages immediately
|
|
|
|
except asyncio.TimeoutError:
|
|
pass # No message, continue to heartbeat check
|
|
except Exception as e:
|
|
logger.error(f"Redis error: {e}")
|
|
# Continue with heartbeats even if Redis fails
|
|
|
|
# Sleep briefly to avoid busy loop
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Send heartbeat if interval elapsed
|
|
current_time = asyncio.get_event_loop().time()
|
|
if current_time - last_heartbeat >= heartbeat_interval:
|
|
# Comment-style ping (Safari-compatible, no event type)
|
|
yield ": ping\n\n"
|
|
last_heartbeat = current_time
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("SSE connection cancelled by client")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"SSE error: {e}")
|
|
raise
|
|
finally:
|
|
# Cleanup Redis connection
|
|
if pubsub:
|
|
try:
|
|
await pubsub.unsubscribe(redis_channel)
|
|
await pubsub.aclose()
|
|
except Exception as e:
|
|
logger.error(f"Error closing pubsub: {e}")
|
|
|
|
if redis_client:
|
|
try:
|
|
await redis_client.aclose()
|
|
except Exception as e:
|
|
logger.error(f"Error closing redis: {e}")
|
|
|
|
logger.info("SSE connection closed")
|
|
|
|
|
|
@app.get("/realtime")
|
|
async def realtime_events(request: Request) -> StreamingResponse:
|
|
"""Server-Sent Events endpoint for real-time updates.
|
|
|
|
Safari-compatible SSE implementation:
|
|
- Immediate retry hint (2.5s reconnect delay)
|
|
- Heartbeat every 15s using comment syntax ": ping"
|
|
- Proper Cache-Control headers
|
|
- No buffering (nginx compatibility)
|
|
- Graceful Redis fallback (heartbeat-only mode)
|
|
|
|
Args:
|
|
request: FastAPI request object
|
|
|
|
Returns:
|
|
StreamingResponse: SSE stream with Redis messages and heartbeats
|
|
"""
|
|
return StreamingResponse(
|
|
event_generator(request),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache, no-transform",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no", # Disable nginx buffering
|
|
}
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
"""Run the API application with uvicorn."""
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"apps.api.main:app",
|
|
host="0.0.0.0",
|
|
port=8001,
|
|
reload=True
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|