refactored
This commit is contained in:
@@ -326,46 +326,18 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
|
||||
last_activity = asyncio.get_event_loop().time()
|
||||
topic = str(message.topic)
|
||||
payload_str = message.payload.decode()
|
||||
logger.debug(f"MQTT message received on {topic}: {payload_str}")
|
||||
|
||||
# Determine if message is from a MAX! device (requires plain text handling)
|
||||
is_max_device = False
|
||||
max_device_id = None
|
||||
max_device_type = None
|
||||
|
||||
# Check if topic matches any MAX! device state topic
|
||||
for device_id, device in devices.items():
|
||||
if device.get("technology") == "max" and topic == device["topics"]["state"]:
|
||||
is_max_device = True
|
||||
max_device_id = device_id
|
||||
max_device_type = device["type"]
|
||||
break
|
||||
|
||||
# Check for Shelly relay (also sends plain text)
|
||||
is_shelly_relay = False
|
||||
shelly_device_id = None
|
||||
shelly_device_type = None
|
||||
for device_id, device in devices.items():
|
||||
if device.get("technology") == "shelly" and device.get("type") == "relay":
|
||||
if topic == device["topics"]["state"]:
|
||||
is_shelly_relay = True
|
||||
shelly_device_id = device_id
|
||||
shelly_device_type = device["type"]
|
||||
break
|
||||
|
||||
# Parse payload based on device technology
|
||||
if is_max_device or is_shelly_relay:
|
||||
# MAX! and Shelly send plain text, not JSON
|
||||
payload = payload_str.strip()
|
||||
else:
|
||||
# All other technologies use JSON
|
||||
# Check if this is an abstract SET message
|
||||
if topic.startswith("home/") and topic.endswith("/set"):
|
||||
|
||||
# abstract messages should have json payload
|
||||
try:
|
||||
payload = json.loads(payload_str)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Invalid JSON on {topic}: {payload_str}")
|
||||
continue
|
||||
|
||||
# Check if this is an abstract SET message
|
||||
if topic.startswith("home/") and topic.endswith("/set"):
|
||||
|
||||
# Extract device_type and device_id from topic
|
||||
parts = topic.split("/")
|
||||
if len(parts) == 4: # home/<type>/<id>/set
|
||||
@@ -382,32 +354,15 @@ async def mqtt_worker(config: dict[str, Any], redis_client: aioredis.Redis) -> N
|
||||
|
||||
# Check if this is a vendor STATE message
|
||||
else:
|
||||
# For MAX! devices, we already identified them above
|
||||
if is_max_device:
|
||||
device = devices[max_device_id]
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_vendor_state(
|
||||
client, redis_client, max_device_id, max_device_type,
|
||||
device_technology, payload, redis_channel
|
||||
)
|
||||
# For Shelly relay devices, we already identified them above
|
||||
elif is_shelly_relay:
|
||||
device = devices[shelly_device_id]
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_vendor_state(
|
||||
client, redis_client, shelly_device_id, shelly_device_type,
|
||||
device_technology, payload, redis_channel
|
||||
)
|
||||
else:
|
||||
# Find device by vendor state topic for other technologies
|
||||
for device_id, device in devices.items():
|
||||
if topic == device["topics"]["state"]:
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_vendor_state(
|
||||
client, redis_client, device_id, device["type"],
|
||||
device_technology, payload, redis_channel
|
||||
)
|
||||
break
|
||||
# Find device by vendor state topic for other technologies
|
||||
for device_id, device in devices.items():
|
||||
if topic == device["topics"]["state"]:
|
||||
device_technology = device.get("technology", "unknown")
|
||||
await handle_vendor_state(
|
||||
client, redis_client, device_id, device["type"],
|
||||
device_technology, payload_str, redis_channel
|
||||
)
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("MQTT worker cancelled")
|
||||
|
||||
Reference in New Issue
Block a user