refactored 5
This commit is contained in:
@@ -296,74 +296,73 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
|
||||
) as client:
|
||||
logger.info(f"Connected to MQTT broker as {unique_client_id}")
|
||||
|
||||
async with client.messages as messages:
|
||||
# Subscribe to topics for all devices
|
||||
for device in devices.values():
|
||||
device_id = device['device_id']
|
||||
device_type = device['type']
|
||||
# Subscribe to topics for all devices
|
||||
for device in devices.values():
|
||||
device_id = device['device_id']
|
||||
device_type = device['type']
|
||||
|
||||
# Subscribe to abstract SET topic only if device has a SET topic (not read-only)
|
||||
if "set" in device["topics"]:
|
||||
abstract_set_topic = f"home/{device_type}/{device_id}/set"
|
||||
await client.subscribe(abstract_set_topic)
|
||||
logger.info(f"Subscribed to abstract SET: {abstract_set_topic}")
|
||||
else:
|
||||
logger.info(f"Skipping SET subscription for read-only device: {device_id}")
|
||||
|
||||
# Subscribe to vendor STATE topics (all devices have state)
|
||||
vendor_state_topic = device["topics"]["state"]
|
||||
await client.subscribe(vendor_state_topic)
|
||||
logger.info(f"Subscribed to vendor STATE: {vendor_state_topic}")
|
||||
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 1
|
||||
|
||||
# Track last activity for connection health
|
||||
last_activity = asyncio.get_event_loop().time()
|
||||
connection_timeout = keepalive * 2 # 2x keepalive as timeout
|
||||
|
||||
# Process messages
|
||||
async for message in client.messages:
|
||||
try:
|
||||
last_activity = asyncio.get_event_loop().time()
|
||||
topic = str(message.topic)
|
||||
payload_str = message.payload.decode()
|
||||
retain = getattr(message, 'retain', None)
|
||||
logger.debug(f"MQTT message received on ({retain=}) {topic}: {payload_str}")
|
||||
|
||||
# Subscribe to abstract SET topic only if device has a SET topic (not read-only)
|
||||
if "set" in device["topics"]:
|
||||
abstract_set_topic = f"home/{device_type}/{device_id}/set"
|
||||
await client.subscribe(abstract_set_topic)
|
||||
logger.info(f"Subscribed to abstract SET: {abstract_set_topic}")
|
||||
# Check if this is an abstract SET message
|
||||
if topic.startswith("home/") and topic.endswith("/set"):
|
||||
|
||||
payload = json.loads(payload_str)
|
||||
|
||||
# Extract device_type and device_id from topic
|
||||
parts = topic.split("/")
|
||||
if len(parts) == 4: # home/<type>/<id>/set
|
||||
device_type = parts[1]
|
||||
device_id = parts[2]
|
||||
|
||||
if device_id in devices:
|
||||
device = devices[device_id]
|
||||
vendor_topic = device["topics"]["set"]
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_abstract_set(
|
||||
client, device_id, device_type, device_technology, vendor_topic, payload
|
||||
)
|
||||
|
||||
# Check if this is a vendor STATE message
|
||||
else:
|
||||
logger.info(f"Skipping SET subscription for read-only device: {device_id}")
|
||||
# Find device by vendor state topic for other technologies
|
||||
for device_id, device in devices.items():
|
||||
if topic == device["topics"]["state"]:
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_vendor_state(
|
||||
client, redis_client, device_id, device["type"],
|
||||
device_technology, payload_str, redis_channel
|
||||
)
|
||||
break
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Failed to decode JSON payload on topic {topic}: {payload_str}")
|
||||
|
||||
# Subscribe to vendor STATE topics (all devices have state)
|
||||
vendor_state_topic = device["topics"]["state"]
|
||||
await client.subscribe(vendor_state_topic)
|
||||
logger.info(f"Subscribed to vendor STATE: {vendor_state_topic}")
|
||||
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 1
|
||||
|
||||
# Track last activity for connection health
|
||||
last_activity = asyncio.get_event_loop().time()
|
||||
connection_timeout = keepalive * 2 # 2x keepalive as timeout
|
||||
|
||||
# Process messages
|
||||
async for message in messages:
|
||||
try:
|
||||
last_activity = asyncio.get_event_loop().time()
|
||||
topic = str(message.topic)
|
||||
payload_str = message.payload.decode()
|
||||
retain = getattr(message, 'retain', None)
|
||||
logger.debug(f"MQTT message received on ({retain=}) {topic}: {payload_str}")
|
||||
|
||||
# Check if this is an abstract SET message
|
||||
if topic.startswith("home/") and topic.endswith("/set"):
|
||||
|
||||
payload = json.loads(payload_str)
|
||||
|
||||
# Extract device_type and device_id from topic
|
||||
parts = topic.split("/")
|
||||
if len(parts) == 4: # home/<type>/<id>/set
|
||||
device_type = parts[1]
|
||||
device_id = parts[2]
|
||||
|
||||
if device_id in devices:
|
||||
device = devices[device_id]
|
||||
vendor_topic = device["topics"]["set"]
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_abstract_set(
|
||||
client, device_id, device_type, device_technology, vendor_topic, payload
|
||||
)
|
||||
|
||||
# Check if this is a vendor STATE message
|
||||
else:
|
||||
# Find device by vendor state topic for other technologies
|
||||
for device_id, device in devices.items():
|
||||
if topic == device["topics"]["state"]:
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_vendor_state(
|
||||
client, redis_client, device_id, device["type"],
|
||||
device_technology, payload_str, redis_channel
|
||||
)
|
||||
break
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Failed to decode JSON payload on topic {topic}: {payload_str}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("MQTT worker cancelled")
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user