thermostat working
This commit is contained in:
@@ -10,6 +10,8 @@ from typing import Any
|
||||
|
||||
import redis.asyncio as aioredis
|
||||
import yaml
|
||||
import socket
|
||||
import uuid
|
||||
from aiomqtt import Client
|
||||
from pydantic import ValidationError
|
||||
|
||||
@@ -228,6 +230,9 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
|
||||
broker = mqtt_config.get("broker", "172.16.2.16")
|
||||
port = mqtt_config.get("port", 1883)
|
||||
client_id = mqtt_config.get("client_id", "home-automation-abstraction")
|
||||
# Append a short suffix (ENV override possible) so multiple processes don't collide
|
||||
client_suffix = os.environ.get("MQTT_CLIENT_ID_SUFFIX") or uuid.uuid4().hex[:6]
|
||||
unique_client_id = f"{client_id}-{client_suffix}"
|
||||
keepalive = mqtt_config.get("keepalive", 60)
|
||||
|
||||
redis_config = config.get("redis", {})
|
||||
@@ -245,8 +250,9 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
|
||||
async with Client(
|
||||
hostname=broker,
|
||||
port=port,
|
||||
identifier=client_id,
|
||||
keepalive=keepalive
|
||||
identifier=unique_client_id,
|
||||
keepalive=keepalive,
|
||||
timeout=10.0 # Add explicit timeout for operations
|
||||
) as client:
|
||||
logger.info(f"Connected to MQTT broker as {client_id}")
|
||||
|
||||
@@ -264,8 +270,13 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 1
|
||||
|
||||
# Track last activity for connection health
|
||||
last_activity = asyncio.get_event_loop().time()
|
||||
connection_timeout = keepalive * 2 # 2x keepalive as timeout
|
||||
|
||||
# Process messages
|
||||
async for message in client.messages:
|
||||
last_activity = asyncio.get_event_loop().time()
|
||||
topic = str(message.topic)
|
||||
payload_str = message.payload.decode()
|
||||
|
||||
@@ -300,8 +311,13 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
|
||||
)
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("MQTT worker cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
import traceback
|
||||
logger.error(f"MQTT error: {e}")
|
||||
logger.debug(f"Traceback: {traceback.format_exc()}")
|
||||
logger.info(f"Reconnecting in {retry_delay}s...")
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay = min(retry_delay * 2, max_retry_delay)
|
||||
|
||||
Reference in New Issue
Block a user