initial, step 2 already
This commit is contained in:
287
apps/abstraction/main.py
Normal file
287
apps/abstraction/main.py
Normal file
@@ -0,0 +1,287 @@
|
||||
"""Abstraction main entry point."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import redis.asyncio as aioredis
|
||||
import yaml
|
||||
from aiomqtt import Client
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_config(config_path: Path) -> dict[str, Any]:
|
||||
"""Load configuration from YAML file.
|
||||
|
||||
Args:
|
||||
config_path: Path to the configuration file
|
||||
|
||||
Returns:
|
||||
dict: Configuration dictionary
|
||||
"""
|
||||
if not config_path.exists():
|
||||
logger.warning(f"Config file not found: {config_path}, using defaults")
|
||||
return {
|
||||
"mqtt": {
|
||||
"broker": "172.16.2.16",
|
||||
"port": 1883,
|
||||
"client_id": "home-automation-abstraction",
|
||||
"keepalive": 60
|
||||
},
|
||||
"devices": []
|
||||
}
|
||||
|
||||
with open(config_path, "r") as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
logger.info(f"Loaded configuration from {config_path}")
|
||||
return config
|
||||
|
||||
|
||||
def validate_devices(devices: list[dict[str, Any]]) -> None:
|
||||
"""Validate device configuration.
|
||||
|
||||
Args:
|
||||
devices: List of device configurations
|
||||
|
||||
Raises:
|
||||
ValueError: If device configuration is invalid
|
||||
"""
|
||||
for device in devices:
|
||||
if "id" not in device:
|
||||
raise ValueError(f"Device missing 'id': {device}")
|
||||
if "type" not in device:
|
||||
raise ValueError(f"Device {device['id']} missing 'type'")
|
||||
if "topics" not in device:
|
||||
raise ValueError(f"Device {device['id']} missing 'topics'")
|
||||
if "set" not in device["topics"] or "state" not in device["topics"]:
|
||||
raise ValueError(f"Device {device['id']} missing 'topics.set' or 'topics.state'")
|
||||
logger.info(f"Validated {len(devices)} device(s)")
|
||||
|
||||
|
||||
async def get_redis_client(redis_url: str, max_retries: int = 5) -> aioredis.Redis:
|
||||
"""Connect to Redis with exponential backoff.
|
||||
|
||||
Args:
|
||||
redis_url: Redis connection URL
|
||||
max_retries: Maximum number of connection attempts
|
||||
|
||||
Returns:
|
||||
Redis client instance
|
||||
"""
|
||||
retry_delay = 1
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
redis_client = await aioredis.from_url(redis_url, decode_responses=True)
|
||||
await redis_client.ping()
|
||||
logger.info(f"Connected to Redis: {redis_url}")
|
||||
return redis_client
|
||||
except Exception as e:
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(f"Redis connection failed (attempt {attempt + 1}/{max_retries}): {e}")
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay = min(retry_delay * 2, 30) # Exponential backoff, max 30s
|
||||
else:
|
||||
logger.error(f"Failed to connect to Redis after {max_retries} attempts")
|
||||
raise
|
||||
|
||||
|
||||
async def handle_abstract_set(
|
||||
mqtt_client: Client,
|
||||
device_id: str,
|
||||
device_type: str,
|
||||
vendor_topic: str,
|
||||
payload: dict[str, Any]
|
||||
) -> None:
|
||||
"""Handle abstract SET message and publish to vendor topic.
|
||||
|
||||
Args:
|
||||
mqtt_client: MQTT client instance
|
||||
device_id: Device identifier
|
||||
device_type: Device type (e.g., 'light')
|
||||
vendor_topic: Vendor-specific SET topic
|
||||
payload: Message payload
|
||||
"""
|
||||
# Extract actual payload (remove type wrapper if present)
|
||||
vendor_payload = payload.get("payload", payload)
|
||||
vendor_message = json.dumps(vendor_payload)
|
||||
|
||||
logger.info(f"→ vendor SET {device_id}: {vendor_topic} ← {vendor_message}")
|
||||
await mqtt_client.publish(vendor_topic, vendor_message, qos=1)
|
||||
|
||||
|
||||
async def handle_vendor_state(
|
||||
mqtt_client: Client,
|
||||
redis_client: aioredis.Redis,
|
||||
device_id: str,
|
||||
device_type: str,
|
||||
payload: dict[str, Any],
|
||||
redis_channel: str = "ui:updates"
|
||||
) -> None:
|
||||
"""Handle vendor STATE message and publish to abstract topic + Redis.
|
||||
|
||||
Args:
|
||||
mqtt_client: MQTT client instance
|
||||
redis_client: Redis client instance
|
||||
device_id: Device identifier
|
||||
device_type: Device type (e.g., 'light')
|
||||
payload: State payload
|
||||
redis_channel: Redis channel for UI updates
|
||||
"""
|
||||
# Publish to abstract state topic (retained)
|
||||
abstract_topic = f"home/{device_type}/{device_id}/state"
|
||||
abstract_message = json.dumps(payload)
|
||||
|
||||
logger.info(f"← abstract STATE {device_id}: {abstract_topic} → {abstract_message}")
|
||||
await mqtt_client.publish(abstract_topic, abstract_message, qos=1, retain=True)
|
||||
|
||||
# Publish to Redis for UI updates
|
||||
ui_update = {
|
||||
"type": "state",
|
||||
"device_id": device_id,
|
||||
"payload": payload
|
||||
}
|
||||
redis_message = json.dumps(ui_update)
|
||||
|
||||
logger.info(f"← Redis PUBLISH {redis_channel} → {redis_message}")
|
||||
await redis_client.publish(redis_channel, redis_message)
|
||||
|
||||
|
||||
async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> None:
|
||||
"""MQTT worker that handles device communication.
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary containing MQTT settings
|
||||
redis_client: Redis client for UI updates
|
||||
"""
|
||||
mqtt_config = config.get("mqtt", {})
|
||||
broker = mqtt_config.get("broker", "172.16.2.16")
|
||||
port = mqtt_config.get("port", 1883)
|
||||
client_id = mqtt_config.get("client_id", "home-automation-abstraction")
|
||||
keepalive = mqtt_config.get("keepalive", 60)
|
||||
|
||||
redis_config = config.get("redis", {})
|
||||
redis_channel = redis_config.get("channel", "ui:updates")
|
||||
|
||||
devices = {d["id"]: d for d in config.get("devices", [])}
|
||||
|
||||
retry_delay = 1
|
||||
max_retry_delay = 60
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info(f"Connecting to MQTT broker: {broker}:{port}")
|
||||
|
||||
async with Client(
|
||||
hostname=broker,
|
||||
port=port,
|
||||
identifier=client_id,
|
||||
keepalive=keepalive
|
||||
) as client:
|
||||
logger.info(f"Connected to MQTT broker as {client_id}")
|
||||
|
||||
# Subscribe to abstract SET topics for all devices
|
||||
for device in devices.values():
|
||||
abstract_set_topic = f"home/{device['type']}/{device['id']}/set"
|
||||
await client.subscribe(abstract_set_topic)
|
||||
logger.info(f"Subscribed to abstract SET: {abstract_set_topic}")
|
||||
|
||||
# Subscribe to vendor STATE topics
|
||||
vendor_state_topic = device["topics"]["state"]
|
||||
await client.subscribe(vendor_state_topic)
|
||||
logger.info(f"Subscribed to vendor STATE: {vendor_state_topic}")
|
||||
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 1
|
||||
|
||||
# Process messages
|
||||
async for message in client.messages:
|
||||
topic = str(message.topic)
|
||||
payload_str = message.payload.decode()
|
||||
|
||||
try:
|
||||
payload = json.loads(payload_str)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Invalid JSON on {topic}: {payload_str}")
|
||||
continue
|
||||
|
||||
# Check if this is an abstract SET message
|
||||
if topic.startswith("home/") and topic.endswith("/set"):
|
||||
# Extract device_type and device_id from topic
|
||||
parts = topic.split("/")
|
||||
if len(parts) == 4: # home/<type>/<id>/set
|
||||
device_type = parts[1]
|
||||
device_id = parts[2]
|
||||
|
||||
if device_id in devices:
|
||||
device = devices[device_id]
|
||||
vendor_topic = device["topics"]["set"]
|
||||
await handle_abstract_set(
|
||||
client, device_id, device_type, vendor_topic, payload
|
||||
)
|
||||
|
||||
# Check if this is a vendor STATE message
|
||||
else:
|
||||
# Find device by vendor state topic
|
||||
for device_id, device in devices.items():
|
||||
if topic == device["topics"]["state"]:
|
||||
await handle_vendor_state(
|
||||
client, redis_client, device_id, device["type"], payload, redis_channel
|
||||
)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"MQTT error: {e}")
|
||||
logger.info(f"Reconnecting in {retry_delay}s...")
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay = min(retry_delay * 2, max_retry_delay)
|
||||
|
||||
|
||||
async def async_main() -> None:
|
||||
"""Async main function for the abstraction worker."""
|
||||
# Determine config path
|
||||
config_path = Path(__file__).parent.parent.parent / "config" / "devices.yaml"
|
||||
|
||||
# Load configuration
|
||||
config = load_config(config_path)
|
||||
|
||||
# Validate devices
|
||||
devices = config.get("devices") or []
|
||||
validate_devices(devices)
|
||||
logger.info(f"Loaded {len(devices)} device(s) from configuration")
|
||||
|
||||
# Get Redis URL from config or environment variable or use default
|
||||
redis_config = config.get("redis", {})
|
||||
redis_url = redis_config.get("url") or os.environ.get("REDIS_URL", "redis://localhost:6379/0")
|
||||
|
||||
# Connect to Redis with retry
|
||||
redis_client = await get_redis_client(redis_url)
|
||||
|
||||
logger.info("Abstraction worker started")
|
||||
|
||||
# Start MQTT worker
|
||||
await mqtt_worker(config, redis_client)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run the abstraction application."""
|
||||
try:
|
||||
asyncio.run(async_main())
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Abstraction worker stopped by user")
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user