706 lines
23 KiB
Python
706 lines
23 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, AsyncGenerator
|
|
|
|
import redis.asyncio as aioredis
|
|
import yaml
|
|
from aiomqtt import Client
|
|
from fastapi import FastAPI, HTTPException, Request, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel, ValidationError
|
|
|
|
from packages.home_capabilities import (
|
|
LIGHT_VERSION,
|
|
THERMOSTAT_VERSION,
|
|
CONTACT_SENSOR_VERSION,
|
|
TEMP_HUMIDITY_SENSOR_VERSION,
|
|
RELAY_VERSION,
|
|
LightState,
|
|
ThermostatState,
|
|
ContactState,
|
|
TempHumidityState,
|
|
RelayState,
|
|
load_layout,
|
|
)
|
|
|
|
# Import resolvers (must be before router imports to avoid circular dependency)
|
|
from apps.api.resolvers import (
|
|
DeviceDTO,
|
|
resolve_group_devices,
|
|
resolve_scene_step_devices,
|
|
load_device_rooms,
|
|
get_room,
|
|
clear_room_cache,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# STATE CACHES
|
|
# ============================================================================
|
|
|
|
# In-memory cache for last known device states
|
|
# Will be populated from Redis pub/sub messages
|
|
device_states: dict[str, dict[str, Any]] = {}
|
|
|
|
# Background task reference
|
|
background_task: asyncio.Task | None = None
|
|
|
|
app = FastAPI(
|
|
title="Home Automation API",
|
|
description="API for home automation system",
|
|
version="0.1.0"
|
|
)
|
|
|
|
# Configure CORS for localhost (Frontend)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[
|
|
"http://localhost:8002",
|
|
"http://172.19.1.11:8002",
|
|
"http://127.0.0.1:8002",
|
|
],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
@app.get("/devices/{device_id}/layout")
|
|
async def get_device_layout(device_id: str):
|
|
"""Gibt die layout-spezifischen Informationen für ein einzelnes Gerät zurück."""
|
|
layout = load_layout()
|
|
for room in layout.rooms:
|
|
for device in room.devices:
|
|
if device.device_id == device_id:
|
|
return {
|
|
"device_id": device_id,
|
|
"room": room.name,
|
|
"title": device.title,
|
|
"icon": device.icon,
|
|
"rank": device.rank,
|
|
}
|
|
raise HTTPException(status_code=404, detail="Device layout not found")
|
|
|
|
@app.get("/devices/{device_id}/state")
|
|
async def get_device_state(device_id: str):
|
|
try:
|
|
logger.debug(f"Fetching state for device {device_id}")
|
|
state = device_states[device_id]
|
|
return state
|
|
except KeyError:
|
|
raise HTTPException(status_code=404, detail="Device state not found")
|
|
|
|
# --- Minimal-invasive: Einzelgerät-Layout-Endpunkt ---
|
|
@app.get("/devices/{device_id}/layout")
|
|
async def get_device_layout(device_id: str):
|
|
"""Gibt die layout-spezifischen Informationen für ein einzelnes Gerät zurück."""
|
|
layout = load_layout()
|
|
for room in layout.get("rooms", []):
|
|
for device in room.get("devices", []):
|
|
if device.get("device_id") == device_id:
|
|
# Rückgabe: Layout-Infos + Raumname
|
|
return {
|
|
"device_id": device_id,
|
|
"room": room.get("name"),
|
|
"title": device.get("title"),
|
|
"icon": device.get("icon"),
|
|
"rank": device.get("rank"),
|
|
}
|
|
raise HTTPException(status_code=404, detail="Device layout not found")
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Include routers after app is initialized to avoid circular imports."""
|
|
from apps.api.routes.groups_scenes import router as groups_scenes_router
|
|
app.include_router(groups_scenes_router, prefix="")
|
|
|
|
|
|
@app.get("/health")
|
|
async def health() -> dict[str, str]:
|
|
"""Health check endpoint.
|
|
|
|
Returns:
|
|
dict: Status indicating the service is healthy
|
|
"""
|
|
return {"status": "ok"}
|
|
|
|
|
|
async def redis_state_listener():
|
|
"""Background task that listens to Redis pub/sub and updates state cache."""
|
|
redis_client = None
|
|
pubsub = None
|
|
|
|
try:
|
|
redis_url, redis_channel = get_redis_settings()
|
|
logger.info(f"Starting Redis state listener for channel {redis_channel}")
|
|
|
|
redis_client = await aioredis.from_url(redis_url, decode_responses=True)
|
|
pubsub = redis_client.pubsub()
|
|
await pubsub.subscribe(redis_channel)
|
|
|
|
logger.info("Redis state listener connected")
|
|
|
|
while True:
|
|
try:
|
|
message = await asyncio.wait_for(
|
|
pubsub.get_message(ignore_subscribe_messages=True),
|
|
timeout=1.0
|
|
)
|
|
|
|
if message and message["type"] == "message":
|
|
data = message["data"]
|
|
try:
|
|
state_data = json.loads(data)
|
|
if state_data.get("type") == "state" and state_data.get("device_id"):
|
|
device_id = state_data["device_id"]
|
|
payload = state_data.get("payload", {})
|
|
device_states[device_id] = payload
|
|
logger.debug(f"Updated state cache for {device_id}: {payload}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse state data: {e}")
|
|
|
|
except asyncio.TimeoutError:
|
|
pass # No message, continue
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Redis state listener cancelled")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Redis state listener error: {e}")
|
|
finally:
|
|
if pubsub:
|
|
await pubsub.unsubscribe(redis_channel)
|
|
await pubsub.close()
|
|
if redis_client:
|
|
await redis_client.close()
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Start background tasks on application startup."""
|
|
global background_task
|
|
background_task = asyncio.create_task(redis_state_listener())
|
|
logger.info("Started background Redis state listener")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Clean up background tasks on application shutdown."""
|
|
global background_task
|
|
if background_task:
|
|
background_task.cancel()
|
|
try:
|
|
await background_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("Stopped background Redis state listener")
|
|
|
|
|
|
@app.get("/spec")
|
|
async def spec() -> dict[str, dict[str, str]]:
|
|
"""Capability specification endpoint.
|
|
|
|
Returns:
|
|
dict: Dictionary containing supported capabilities and their versions
|
|
"""
|
|
return {
|
|
"capabilities": {
|
|
"light": LIGHT_VERSION,
|
|
"thermostat": THERMOSTAT_VERSION,
|
|
"contact": CONTACT_SENSOR_VERSION,
|
|
"temp_humidity": TEMP_HUMIDITY_SENSOR_VERSION,
|
|
"relay": RELAY_VERSION
|
|
}
|
|
}
|
|
|
|
|
|
# Pydantic Models
|
|
class SetDeviceRequest(BaseModel):
|
|
"""Request model for setting device state."""
|
|
type: str
|
|
payload: dict[str, Any]
|
|
|
|
|
|
class DeviceInfo(BaseModel):
|
|
"""Device information model."""
|
|
device_id: str
|
|
type: str
|
|
name: str
|
|
features: dict[str, Any] = {}
|
|
|
|
|
|
# Configuration helpers
|
|
def load_devices() -> list[dict[str, Any]]:
|
|
"""Load devices from configuration file.
|
|
|
|
Returns:
|
|
list: List of device configurations
|
|
"""
|
|
config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
|
|
|
|
if not config_path.exists():
|
|
return []
|
|
|
|
with open(config_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
# Normalize device entries: accept both 'id' and 'device_id', use 'device_id' internally
|
|
devices = config.get("devices", [])
|
|
for device in devices:
|
|
device["device_id"] = device.pop("device_id", device.pop("id", None))
|
|
|
|
return devices
|
|
|
|
|
|
def get_mqtt_settings() -> tuple[str, int]:
|
|
"""Get MQTT broker settings from environment.
|
|
|
|
Supports both MQTT_BROKER and MQTT_HOST for compatibility.
|
|
|
|
Returns:
|
|
tuple: (host, port)
|
|
"""
|
|
host = os.environ.get("MQTT_BROKER") or os.environ.get("MQTT_HOST", "172.16.2.16")
|
|
port = int(os.environ.get("MQTT_PORT", "1883"))
|
|
return host, port
|
|
|
|
|
|
# ============================================================================
|
|
# MQTT PUBLISH
|
|
# ============================================================================
|
|
|
|
async def publish_abstract_set(device_type: str, device_id: str, payload: dict[str, Any]) -> None:
|
|
"""
|
|
Publish an abstract set command via MQTT.
|
|
|
|
This function encapsulates MQTT publishing logic so that group/scene
|
|
execution doesn't need to know MQTT topic details.
|
|
|
|
Topic format: home/{device_type}/{device_id}/set
|
|
Message format: {"type": device_type, "payload": payload}
|
|
|
|
Args:
|
|
device_type: Device type (light, thermostat, relay, etc.)
|
|
device_id: Device identifier
|
|
payload: Command payload (e.g., {"power": "on", "brightness": 50})
|
|
|
|
Example:
|
|
>>> await publish_abstract_set("light", "kueche_deckenlampe", {"power": "on", "brightness": 35})
|
|
# Publishes to: home/light/kueche_deckenlampe/set
|
|
# Message: {"type": "light", "payload": {"power": "on", "brightness": 35}}
|
|
"""
|
|
mqtt_host, mqtt_port = get_mqtt_settings()
|
|
topic = f"home/{device_type}/{device_id}/set"
|
|
message = {
|
|
"type": device_type,
|
|
"payload": payload
|
|
}
|
|
|
|
try:
|
|
async with Client(hostname=mqtt_host, port=mqtt_port) as client:
|
|
await client.publish(
|
|
topic=topic,
|
|
payload=json.dumps(message),
|
|
qos=1
|
|
)
|
|
logger.info(f"Published to {topic}: {message}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish to {topic}: {e}")
|
|
raise
|
|
|
|
|
|
def get_redis_settings() -> tuple[str, str]:
|
|
"""Get Redis settings from configuration.
|
|
|
|
Prioritizes environment variables over config file:
|
|
- REDIS_HOST, REDIS_PORT, REDIS_DB → redis://host:port/db
|
|
- REDIS_CHANNEL → pub/sub channel name
|
|
|
|
Returns:
|
|
tuple: (url, channel)
|
|
"""
|
|
# Check environment variables first
|
|
redis_host = os.getenv("REDIS_HOST")
|
|
redis_port = os.getenv("REDIS_PORT", "6379")
|
|
redis_db = os.getenv("REDIS_DB", "0")
|
|
redis_channel = os.getenv("REDIS_CHANNEL", "ui:updates")
|
|
|
|
if redis_host:
|
|
url = f"redis://{redis_host}:{redis_port}/{redis_db}"
|
|
return url, redis_channel
|
|
|
|
# Fallback to config file
|
|
config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
|
|
|
|
if config_path.exists():
|
|
with open(config_path, "r") as f:
|
|
config = yaml.safe_load(f)
|
|
redis_config = config.get("redis", {})
|
|
url = redis_config.get("url", "redis://localhost:6379/0")
|
|
channel = redis_config.get("channel", "ui:updates")
|
|
return url, channel
|
|
|
|
return "redis://localhost:6379/0", "ui:updates"
|
|
|
|
|
|
async def publish_mqtt(topic: str, payload: dict[str, Any]) -> None:
|
|
"""Publish message to MQTT broker.
|
|
|
|
Args:
|
|
topic: MQTT topic to publish to
|
|
payload: Message payload
|
|
"""
|
|
host, port = get_mqtt_settings()
|
|
message = json.dumps(payload)
|
|
|
|
async with Client(hostname=host, port=port, identifier="home-automation-api") as client:
|
|
await client.publish(topic, message, qos=1)
|
|
|
|
|
|
@app.get("/devices/{device_id}")
|
|
async def get_device(device_id: str) -> DeviceInfo:
|
|
logger.debug(f"Fetching info for device {device_id}")
|
|
devices = load_devices()
|
|
device = next((d for d in devices if d["device_id"] == device_id), None)
|
|
if not device:
|
|
raise HTTPException(status_code=404, detail="Device not found")
|
|
return DeviceInfo(
|
|
device_id=device["device_id"],
|
|
type=device["type"],
|
|
name=device.get("name", device["device_id"]),
|
|
features=device.get("features", {})
|
|
)
|
|
|
|
@app.get("/devices")
|
|
async def get_devices() -> list[DeviceInfo]:
|
|
"""Get list of available devices.
|
|
|
|
Returns:
|
|
list: List of device information including features
|
|
"""
|
|
logger.debug("Fetching list of devices")
|
|
devices = load_devices()
|
|
return [
|
|
DeviceInfo(
|
|
device_id=device["device_id"],
|
|
type=device["type"],
|
|
name=device.get("name", device["device_id"]),
|
|
features=device.get("features", {})
|
|
)
|
|
for device in devices
|
|
]
|
|
|
|
|
|
@app.get("/devices/states")
|
|
async def get_device_states() -> dict[str, dict[str, Any]]:
|
|
"""Get current states of all devices from in-memory cache.
|
|
|
|
Returns:
|
|
dict: Dictionary mapping device_id to state payload
|
|
"""
|
|
logger.debug("Fetching all device states")
|
|
return device_states
|
|
|
|
|
|
@app.get("/layout")
|
|
async def get_layout() -> dict[str, Any]:
|
|
"""Get UI layout configuration.
|
|
|
|
Returns:
|
|
dict: Layout configuration with rooms and device tiles
|
|
"""
|
|
try:
|
|
layout = load_layout()
|
|
|
|
# Convert Pydantic models to dict
|
|
rooms = []
|
|
for room in layout.rooms:
|
|
devices = []
|
|
for tile in room.devices:
|
|
devices.append({
|
|
"device_id": tile.device_id,
|
|
"title": tile.title,
|
|
"icon": tile.icon,
|
|
"rank": tile.rank
|
|
})
|
|
|
|
rooms.append({
|
|
"name": room.name,
|
|
"devices": devices
|
|
})
|
|
|
|
return {"rooms": rooms}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading layout: {e}")
|
|
# Return empty layout on error
|
|
return {"rooms": []}
|
|
|
|
|
|
@app.get("/devices/{device_id}/room")
|
|
async def get_device_room(device_id: str) -> dict[str, str | None]:
|
|
"""Get the room name for a specific device.
|
|
|
|
Args:
|
|
device_id: Device identifier
|
|
|
|
Returns:
|
|
dict: {"device_id": str, "room": str | null}
|
|
"""
|
|
room = get_room(device_id)
|
|
return {
|
|
"device_id": device_id,
|
|
"room": room
|
|
}
|
|
|
|
|
|
@app.post("/devices/{device_id}/set", status_code=status.HTTP_202_ACCEPTED)
|
|
async def set_device(device_id: str, request: SetDeviceRequest) -> dict[str, str]:
|
|
"""Set device state.
|
|
|
|
Args:
|
|
device_id: Device identifier
|
|
request: Device state request
|
|
|
|
Returns:
|
|
dict: Confirmation message
|
|
|
|
Raises:
|
|
HTTPException: If device not found or payload invalid
|
|
"""
|
|
# Load devices and check if device exists
|
|
devices = load_devices()
|
|
device = next((d for d in devices if d["device_id"] == device_id), None)
|
|
|
|
if not device:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Device {device_id} not found"
|
|
)
|
|
|
|
# Check if device is read-only (contact sensors, etc.)
|
|
if "topics" in device and "set" not in device["topics"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_405_METHOD_NOT_ALLOWED,
|
|
detail="Device is read-only"
|
|
)
|
|
|
|
# Validate payload based on device type
|
|
if request.type == "light":
|
|
try:
|
|
LightState(**request.payload)
|
|
except ValidationError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Invalid payload for light: {e}"
|
|
)
|
|
elif request.type == "relay":
|
|
try:
|
|
RelayState(**request.payload)
|
|
except ValidationError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Invalid payload for relay: {e}"
|
|
)
|
|
elif request.type == "thermostat":
|
|
try:
|
|
# For thermostat SET: only allow mode and target
|
|
allowed_set_fields = {"mode", "target"}
|
|
invalid_fields = set(request.payload.keys()) - allowed_set_fields
|
|
if invalid_fields:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Thermostat SET only allows {allowed_set_fields}, got invalid fields: {invalid_fields}"
|
|
)
|
|
ThermostatState(**request.payload)
|
|
except ValidationError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Invalid payload for thermostat: {e}"
|
|
)
|
|
elif request.type in {"contact", "contact_sensor"}:
|
|
# Contact sensors are read-only
|
|
raise HTTPException(
|
|
status_code=status.HTTP_405_METHOD_NOT_ALLOWED,
|
|
detail="Contact sensors are read-only devices"
|
|
)
|
|
elif request.type in {"temp_humidity", "temp_humidity_sensor"}:
|
|
# Temperature & humidity sensors are read-only
|
|
raise HTTPException(
|
|
status_code=status.HTTP_405_METHOD_NOT_ALLOWED,
|
|
detail="Temperature & humidity sensors are read-only devices"
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail=f"Unsupported device type: {request.type}"
|
|
)
|
|
|
|
# Publish to MQTT
|
|
topic = f"home/{request.type}/{device_id}/set"
|
|
mqtt_payload = {
|
|
"type": request.type,
|
|
"payload": request.payload
|
|
}
|
|
|
|
await publish_mqtt(topic, mqtt_payload)
|
|
|
|
return {"message": f"Command sent to {device_id}"}
|
|
|
|
|
|
async def event_generator(request: Request) -> AsyncGenerator[str, None]:
|
|
"""Generate SSE events from Redis Pub/Sub with Safari compatibility.
|
|
|
|
Safari-compatible features:
|
|
- Immediate retry hint on connection
|
|
- Regular heartbeats every 15s (comment-only, no data)
|
|
- Proper flushing after each yield
|
|
- Graceful disconnect handling
|
|
|
|
Args:
|
|
request: FastAPI request object for disconnect detection
|
|
|
|
Yields:
|
|
str: SSE formatted event strings
|
|
"""
|
|
redis_client = None
|
|
pubsub = None
|
|
|
|
try:
|
|
# Send retry hint immediately for EventSource reconnect behavior
|
|
yield "retry: 2500\n\n"
|
|
|
|
# Try to connect to Redis
|
|
redis_url, redis_channel = get_redis_settings()
|
|
try:
|
|
redis_client = await aioredis.from_url(redis_url, decode_responses=True)
|
|
pubsub = redis_client.pubsub()
|
|
await pubsub.subscribe(redis_channel)
|
|
logger.info(f"SSE client connected, subscribed to {redis_channel}")
|
|
except Exception as e:
|
|
logger.warning(f"Redis unavailable, running in heartbeat-only mode: {e}")
|
|
redis_client = None
|
|
pubsub = None
|
|
|
|
# Heartbeat tracking
|
|
last_heartbeat = asyncio.get_event_loop().time()
|
|
heartbeat_interval = 15 # Safari-friendly: shorter interval
|
|
|
|
while True:
|
|
# Check if client disconnected
|
|
if await request.is_disconnected():
|
|
logger.info("SSE client disconnected")
|
|
break
|
|
|
|
# Try to get message from Redis (if available)
|
|
if pubsub:
|
|
try:
|
|
message = await asyncio.wait_for(
|
|
pubsub.get_message(ignore_subscribe_messages=True),
|
|
timeout=0.1
|
|
)
|
|
|
|
if message and message["type"] == "message":
|
|
data = message["data"]
|
|
logger.debug(f"Sending SSE message: {data[:100]}...")
|
|
|
|
# Update in-memory cache with latest state
|
|
try:
|
|
state_data = json.loads(data)
|
|
if state_data.get("type") == "state" and state_data.get("device_id"):
|
|
device_states[state_data["device_id"]] = state_data.get("payload", {})
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse state data for cache: {e}")
|
|
|
|
yield f"event: message\ndata: {data}\n\n"
|
|
last_heartbeat = asyncio.get_event_loop().time()
|
|
continue # Skip sleep, check for more messages immediately
|
|
|
|
except asyncio.TimeoutError:
|
|
pass # No message, continue to heartbeat check
|
|
except Exception as e:
|
|
logger.error(f"Redis error: {e}")
|
|
# Continue with heartbeats even if Redis fails
|
|
|
|
# Sleep briefly to avoid busy loop
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Send heartbeat if interval elapsed
|
|
current_time = asyncio.get_event_loop().time()
|
|
if current_time - last_heartbeat >= heartbeat_interval:
|
|
# Comment-style ping (Safari-compatible, no event type)
|
|
yield ": ping\n\n"
|
|
last_heartbeat = current_time
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("SSE connection cancelled by client")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"SSE error: {e}")
|
|
raise
|
|
finally:
|
|
# Cleanup Redis connection
|
|
if pubsub:
|
|
try:
|
|
await pubsub.unsubscribe(redis_channel)
|
|
await pubsub.aclose()
|
|
except Exception as e:
|
|
logger.error(f"Error closing pubsub: {e}")
|
|
|
|
if redis_client:
|
|
try:
|
|
await redis_client.aclose()
|
|
except Exception as e:
|
|
logger.error(f"Error closing redis: {e}")
|
|
|
|
logger.info("SSE connection closed")
|
|
|
|
|
|
@app.get("/realtime")
|
|
async def realtime_events(request: Request) -> StreamingResponse:
|
|
"""Server-Sent Events endpoint for real-time updates.
|
|
|
|
Safari-compatible SSE implementation:
|
|
- Immediate retry hint (2.5s reconnect delay)
|
|
- Heartbeat every 15s using comment syntax ": ping"
|
|
- Proper Cache-Control headers
|
|
- No buffering (nginx compatibility)
|
|
- Graceful Redis fallback (heartbeat-only mode)
|
|
|
|
Args:
|
|
request: FastAPI request object
|
|
|
|
Returns:
|
|
StreamingResponse: SSE stream with Redis messages and heartbeats
|
|
"""
|
|
return StreamingResponse(
|
|
event_generator(request),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache, no-transform",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no", # Disable nginx buffering
|
|
}
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
"""Run the API application with uvicorn."""
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"apps.api.main:app",
|
|
host="0.0.0.0",
|
|
port=8001,
|
|
reload=True
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|