diff --git a/apps/abstraction/main.py b/apps/abstraction/main.py index 48e69c2..d28a6d4 100644 --- a/apps/abstraction/main.py +++ b/apps/abstraction/main.py @@ -296,74 +296,73 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N ) as client: logger.info(f"Connected to MQTT broker as {unique_client_id}") - async with client.messages as messages: - # Subscribe to topics for all devices - for device in devices.values(): - device_id = device['device_id'] - device_type = device['type'] + # Subscribe to topics for all devices + for device in devices.values(): + device_id = device['device_id'] + device_type = device['type'] + + # Subscribe to abstract SET topic only if device has a SET topic (not read-only) + if "set" in device["topics"]: + abstract_set_topic = f"home/{device_type}/{device_id}/set" + await client.subscribe(abstract_set_topic) + logger.info(f"Subscribed to abstract SET: {abstract_set_topic}") + else: + logger.info(f"Skipping SET subscription for read-only device: {device_id}") + + # Subscribe to vendor STATE topics (all devices have state) + vendor_state_topic = device["topics"]["state"] + await client.subscribe(vendor_state_topic) + logger.info(f"Subscribed to vendor STATE: {vendor_state_topic}") + + # Reset retry delay on successful connection + retry_delay = 1 + + # Track last activity for connection health + last_activity = asyncio.get_event_loop().time() + connection_timeout = keepalive * 2 # 2x keepalive as timeout + + # Process messages + async for message in client.messages: + try: + last_activity = asyncio.get_event_loop().time() + topic = str(message.topic) + payload_str = message.payload.decode() + retain = getattr(message, 'retain', None) + logger.debug(f"MQTT message received on ({retain=}) {topic}: {payload_str}") - # Subscribe to abstract SET topic only if device has a SET topic (not read-only) - if "set" in device["topics"]: - abstract_set_topic = f"home/{device_type}/{device_id}/set" - await client.subscribe(abstract_set_topic) - logger.info(f"Subscribed to abstract SET: {abstract_set_topic}") + # Check if this is an abstract SET message + if topic.startswith("home/") and topic.endswith("/set"): + + payload = json.loads(payload_str) + + # Extract device_type and device_id from topic + parts = topic.split("/") + if len(parts) == 4: # home///set + device_type = parts[1] + device_id = parts[2] + + if device_id in devices: + device = devices[device_id] + vendor_topic = device["topics"]["set"] + device_technology = device.get("technology", "unknown") + await handle_abstract_set( + client, device_id, device_type, device_technology, vendor_topic, payload + ) + + # Check if this is a vendor STATE message else: - logger.info(f"Skipping SET subscription for read-only device: {device_id}") + # Find device by vendor state topic for other technologies + for device_id, device in devices.items(): + if topic == device["topics"]["state"]: + device_technology = device.get("technology", "unknown") + await handle_vendor_state( + client, redis_client, device_id, device["type"], + device_technology, payload_str, redis_channel + ) + break + except json.JSONDecodeError: + logger.error(f"Failed to decode JSON payload on topic {topic}: {payload_str}") - # Subscribe to vendor STATE topics (all devices have state) - vendor_state_topic = device["topics"]["state"] - await client.subscribe(vendor_state_topic) - logger.info(f"Subscribed to vendor STATE: {vendor_state_topic}") - - # Reset retry delay on successful connection - retry_delay = 1 - - # Track last activity for connection health - last_activity = asyncio.get_event_loop().time() - connection_timeout = keepalive * 2 # 2x keepalive as timeout - - # Process messages - async for message in messages: - try: - last_activity = asyncio.get_event_loop().time() - topic = str(message.topic) - payload_str = message.payload.decode() - retain = getattr(message, 'retain', None) - logger.debug(f"MQTT message received on ({retain=}) {topic}: {payload_str}") - - # Check if this is an abstract SET message - if topic.startswith("home/") and topic.endswith("/set"): - - payload = json.loads(payload_str) - - # Extract device_type and device_id from topic - parts = topic.split("/") - if len(parts) == 4: # home///set - device_type = parts[1] - device_id = parts[2] - - if device_id in devices: - device = devices[device_id] - vendor_topic = device["topics"]["set"] - device_technology = device.get("technology", "unknown") - await handle_abstract_set( - client, device_id, device_type, device_technology, vendor_topic, payload - ) - - # Check if this is a vendor STATE message - else: - # Find device by vendor state topic for other technologies - for device_id, device in devices.items(): - if topic == device["topics"]["state"]: - device_technology = device.get("technology", "unknown") - await handle_vendor_state( - client, redis_client, device_id, device["type"], - device_technology, payload_str, redis_channel - ) - break - except json.JSONDecodeError: - logger.error(f"Failed to decode JSON payload on topic {topic}: {payload_str}") - except asyncio.CancelledError: logger.info("MQTT worker cancelled") raise