Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
3a1841a8a9
|
|||
|
9629850ebb
|
|||
|
000d32b78f
|
|||
|
24b2f70caf
|
@@ -4,620 +4,48 @@ This module implements a registry-pattern for vendor-specific transformations:
|
||||
- Each (device_type, technology, direction) tuple maps to a specific handler function
|
||||
- Handlers transform payloads between abstract and vendor-specific formats
|
||||
- Unknown combinations fall back to pass-through (no transformation)
|
||||
|
||||
Vendor-specific implementations are in the vendors/ subdirectory.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import Any, Callable
|
||||
|
||||
from apps.abstraction.vendors import (
|
||||
simulator,
|
||||
zigbee2mqtt,
|
||||
max,
|
||||
shelly,
|
||||
tasmota,
|
||||
hottis_pv_modbus,
|
||||
hottis_wago_modbus,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: simulator technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_light_simulator_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract light payload to simulator format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def _transform_light_simulator_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform simulator light payload to abstract format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
|
||||
payload = json.loads(payload)
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def _transform_thermostat_simulator_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract thermostat payload to simulator format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def _transform_thermostat_simulator_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform simulator thermostat payload to abstract format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
|
||||
payload = json.loads(payload)
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: zigbee2mqtt technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_light_zigbee2mqtt_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract light payload to zigbee2mqtt format.
|
||||
|
||||
Transformations:
|
||||
- power: 'on'/'off' -> state: 'ON'/'OFF'
|
||||
- brightness: 0-100 -> brightness: 0-254
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on', 'brightness': 100}
|
||||
- zigbee2mqtt: {'state': 'ON', 'brightness': 254}
|
||||
"""
|
||||
vendor_payload = payload.copy()
|
||||
|
||||
# Transform power -> state with uppercase values
|
||||
if "power" in vendor_payload:
|
||||
power_value = vendor_payload.pop("power")
|
||||
vendor_payload["state"] = power_value.upper() if isinstance(power_value, str) else power_value
|
||||
|
||||
# Transform brightness: 0-100 (%) -> 0-254 (zigbee2mqtt range)
|
||||
if "brightness" in vendor_payload:
|
||||
abstract_brightness = vendor_payload["brightness"]
|
||||
if isinstance(abstract_brightness, (int, float)):
|
||||
# Convert percentage (0-100) to zigbee2mqtt range (0-254)
|
||||
vendor_payload["brightness"] = round(abstract_brightness * 254 / 100)
|
||||
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def _transform_light_zigbee2mqtt_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt light payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- state: 'ON'/'OFF' -> power: 'on'/'off'
|
||||
- brightness: 0-254 -> brightness: 0-100
|
||||
|
||||
Example:
|
||||
- zigbee2mqtt: {'state': 'ON', 'brightness': 254}
|
||||
- Abstract: {'power': 'on', 'brightness': 100}
|
||||
"""
|
||||
abstract_payload = json.loads(payload)
|
||||
|
||||
# Transform state -> power with lowercase values
|
||||
if "state" in abstract_payload:
|
||||
state_value = abstract_payload.pop("state")
|
||||
abstract_payload["power"] = state_value.lower() if isinstance(state_value, str) else state_value
|
||||
|
||||
# Transform brightness: 0-254 (zigbee2mqtt range) -> 0-100 (%)
|
||||
if "brightness" in abstract_payload:
|
||||
vendor_brightness = abstract_payload["brightness"]
|
||||
if isinstance(vendor_brightness, (int, float)):
|
||||
# Convert zigbee2mqtt range (0-254) to percentage (0-100)
|
||||
abstract_payload["brightness"] = round(vendor_brightness * 100 / 254)
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
def _transform_thermostat_zigbee2mqtt_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract thermostat payload to zigbee2mqtt format.
|
||||
|
||||
Transformations:
|
||||
- target -> current_heating_setpoint (as string)
|
||||
- mode is ignored (zigbee2mqtt thermostats use system_mode in state only)
|
||||
|
||||
Example:
|
||||
- Abstract: {'target': 22.0}
|
||||
- zigbee2mqtt: {'current_heating_setpoint': '22.0'}
|
||||
"""
|
||||
vendor_payload = {}
|
||||
|
||||
if "target" in payload:
|
||||
# zigbee2mqtt expects current_heating_setpoint as string
|
||||
vendor_payload["current_heating_setpoint"] = str(payload["target"])
|
||||
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def _transform_thermostat_zigbee2mqtt_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt thermostat payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- current_heating_setpoint -> target (as float)
|
||||
- local_temperature -> current (as float)
|
||||
- system_mode -> mode
|
||||
|
||||
Example:
|
||||
- zigbee2mqtt: {'current_heating_setpoint': 15, 'local_temperature': 23, 'system_mode': 'heat'}
|
||||
- Abstract: {'target': 15.0, 'current': 23.0, 'mode': 'heat'}
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
abstract_payload = {}
|
||||
|
||||
# Extract target temperature
|
||||
if "current_heating_setpoint" in payload:
|
||||
setpoint = payload["current_heating_setpoint"]
|
||||
abstract_payload["target"] = float(setpoint)
|
||||
|
||||
# Extract current temperature
|
||||
if "local_temperature" in payload:
|
||||
current = payload["local_temperature"]
|
||||
abstract_payload["current"] = float(current)
|
||||
|
||||
# Extract mode
|
||||
if "system_mode" in payload:
|
||||
abstract_payload["mode"] = payload["system_mode"]
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: contact_sensor - zigbee2mqtt technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_contact_sensor_zigbee2mqtt_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract contact sensor payload to zigbee2mqtt format.
|
||||
|
||||
Contact sensors are read-only, so this should not be called for SET commands.
|
||||
Returns payload as-is for compatibility.
|
||||
"""
|
||||
logger.warning("Contact sensors are read-only - SET commands should not be used")
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def _transform_contact_sensor_zigbee2mqtt_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt contact sensor payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- contact: bool -> "open" | "closed"
|
||||
- zigbee2mqtt semantics: False = OPEN, True = CLOSED (inverted!)
|
||||
- battery: pass through (already 0-100)
|
||||
- linkquality: pass through
|
||||
- device_temperature: pass through (if present)
|
||||
- voltage: pass through (if present)
|
||||
|
||||
Example:
|
||||
- zigbee2mqtt: {"contact": false, "battery": 100, "linkquality": 87}
|
||||
- Abstract: {"contact": "open", "battery": 100, "linkquality": 87}
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
abstract_payload = {}
|
||||
|
||||
# Transform contact state (inverted logic!)
|
||||
if "contact" in payload:
|
||||
contact_bool = payload["contact"]
|
||||
# zigbee2mqtt: False = OPEN, True = CLOSED
|
||||
abstract_payload["contact"] = "closed" if contact_bool else "open"
|
||||
|
||||
# Pass through optional fields
|
||||
if "battery" in payload:
|
||||
abstract_payload["battery"] = payload["battery"]
|
||||
|
||||
if "linkquality" in payload:
|
||||
abstract_payload["linkquality"] = payload["linkquality"]
|
||||
|
||||
if "device_temperature" in payload:
|
||||
abstract_payload["device_temperature"] = payload["device_temperature"]
|
||||
|
||||
if "voltage" in payload:
|
||||
abstract_payload["voltage"] = payload["voltage"]
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: contact_sensor - max technology (Homegear MAX!)
|
||||
# ============================================================================
|
||||
|
||||
def _transform_contact_sensor_max_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract contact sensor payload to MAX! format.
|
||||
|
||||
Contact sensors are read-only, so this should not be called for SET commands.
|
||||
Returns payload as-is for compatibility.
|
||||
"""
|
||||
logger.warning("Contact sensors are read-only - SET commands should not be used")
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def _transform_contact_sensor_max_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform MAX! (Homegear) contact sensor payload to abstract format.
|
||||
|
||||
MAX! sends "true"/"false" (string or bool) on STATE topic.
|
||||
|
||||
Transformations:
|
||||
- "true" or True -> "open" (window/door open)
|
||||
- "false" or False -> "closed" (window/door closed)
|
||||
|
||||
Example:
|
||||
- MAX!: "true" or True
|
||||
- Abstract: {"contact": "open"}
|
||||
"""
|
||||
try:
|
||||
contact_value = payload.strip().lower() == "true"
|
||||
|
||||
# MAX! semantics: True = OPEN, False = CLOSED
|
||||
return {
|
||||
"contact": "open" if contact_value else "closed"
|
||||
}
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.error(f"MAX! contact sensor failed to parse: {payload}, error: {e}")
|
||||
return {
|
||||
"contact": "closed" # Default to closed on error
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: temp_humidity_sensor - zigbee2mqtt technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_temp_humidity_sensor_zigbee2mqtt_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract temp/humidity sensor payload to zigbee2mqtt format.
|
||||
|
||||
Temp/humidity sensors are read-only, so this should not be called for SET commands.
|
||||
Returns payload as-is for compatibility.
|
||||
"""
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def _transform_temp_humidity_sensor_zigbee2mqtt_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt temp/humidity sensor payload to abstract format.
|
||||
|
||||
Passthrough - zigbee2mqtt provides temperature, humidity, battery, linkquality directly.
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
return payload
|
||||
|
||||
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: relay - zigbee2mqtt technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_relay_zigbee2mqtt_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to zigbee2mqtt format.
|
||||
|
||||
Relay only has power on/off, same transformation as light.
|
||||
- power: 'on'/'off' -> state: 'ON'/'OFF'
|
||||
"""
|
||||
vendor_payload = payload.copy()
|
||||
|
||||
if "power" in vendor_payload:
|
||||
power_value = vendor_payload.pop("power")
|
||||
vendor_payload["state"] = power_value.upper() if isinstance(power_value, str) else power_value
|
||||
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def _transform_relay_zigbee2mqtt_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt relay payload to abstract format.
|
||||
|
||||
Relay only has power on/off, same transformation as light.
|
||||
- state: 'ON'/'OFF' -> power: 'on'/'off'
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
abstract_payload = payload.copy()
|
||||
|
||||
if "state" in abstract_payload:
|
||||
state_value = abstract_payload.pop("state")
|
||||
abstract_payload["power"] = state_value.lower() if isinstance(state_value, str) else state_value
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: relay - shelly technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_relay_shelly_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to Shelly format.
|
||||
|
||||
Shelly expects plain text 'on' or 'off' (not JSON).
|
||||
- power: 'on'/'off' -> 'on'/'off' (plain string)
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on'}
|
||||
- Shelly: 'on'
|
||||
"""
|
||||
power = payload.get("power", "off")
|
||||
return power
|
||||
|
||||
|
||||
def _transform_relay_shelly_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform Shelly relay payload to abstract format.
|
||||
|
||||
Shelly sends plain text 'on' or 'off' (not JSON).
|
||||
- 'on'/'off' -> power: 'on'/'off'
|
||||
|
||||
Example:
|
||||
- Shelly: 'on'
|
||||
- Abstract: {'power': 'on'}
|
||||
"""
|
||||
return {"power": payload.strip()}
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: relay - tasmota technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_relay_tasmota_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to Tasmota format.
|
||||
|
||||
Tasmota expects plain text 'on' or 'off' (not JSON).
|
||||
- power: 'on'/'off' -> 'on'/'off' (plain string)
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on'}
|
||||
- Tasmota: 'on'
|
||||
"""
|
||||
power = payload.get("power", "off")
|
||||
return power
|
||||
|
||||
|
||||
def _transform_relay_tasmota_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform Tasmota relay payload to abstract format.
|
||||
|
||||
Tasmota sends plain text 'on' or 'off' (not JSON).
|
||||
- 'on'/'off' -> power: 'on'/'off'
|
||||
|
||||
Example:
|
||||
- Tasmota: 'ON'
|
||||
- Abstract: {'power': 'on'}
|
||||
"""
|
||||
return {"power": payload.strip().lower()}
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: relay - hottis_pv_modbus technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_relay_hottis_pv_modbus_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to Hottis Modbus format.
|
||||
|
||||
Hottis Modbus expects plain text 'on' or 'off' (not JSON).
|
||||
- power: 'on'/'off' -> 'on'/'off' (plain string)
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on'}
|
||||
- Hottis Modbus: 'on'
|
||||
"""
|
||||
power = payload.get("power", "off")
|
||||
return power
|
||||
|
||||
|
||||
def _transform_relay_hottis_pv_modbus_to_abstract(payload: str) -> dict[str, Any]:
|
||||
def _transform_relay_hottis_pv_modbus_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform Hottis Modbus relay payload to abstract format.
|
||||
|
||||
Hottis Modbus sends JSON like:
|
||||
{"status": "Ok", "timestamp": "...", "state": false, "cnt": 528}
|
||||
|
||||
We only care about the 'state' field:
|
||||
- state: true -> power: 'on'
|
||||
- state: false -> power: 'off'
|
||||
"""
|
||||
data = json.loads(payload)
|
||||
|
||||
state = data.get("state", False)
|
||||
power = "on" if bool(state) else "off"
|
||||
|
||||
|
||||
return {"power": payload.strip()}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: three_phase_powermeter - hottis_pv_modbus technology
|
||||
# ============================================================================
|
||||
|
||||
def _transform_three_phase_powermeter_hottis_pv_modbus_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract three_phase_powermeter payload to hottis_pv_modbus format.
|
||||
energy: float = Field(..., description="Total energy in kWh")
|
||||
total_power: float = Field(..., description="Total power in W")
|
||||
phase1_power: float = Field(..., description="Power for phase 1 in W")
|
||||
phase2_power: float = Field(..., description="Power for phase 2 in W")
|
||||
phase3_power: float = Field(..., description="Power for phase 3 in W")
|
||||
phase1_voltage: float = Field(..., description="Voltage for phase 1 in V")
|
||||
phase2_voltage: float = Field(..., description="Voltage for phase 2 in V")
|
||||
phase3_voltage: float = Field(..., description="Voltage for phase 3 in V")
|
||||
phase1_current: float = Field(..., description="Current for phase 1 in A")
|
||||
phase2_current: float = Field(..., description="Current for phase 2 in A")
|
||||
phase3_current: float = Field(..., description="Current for phase 3 in A")
|
||||
|
||||
|
||||
"""
|
||||
|
||||
vendor_payload = {
|
||||
"energy": payload.get("energy", 0.0),
|
||||
"total_power": payload.get("total_power", 0.0),
|
||||
"phase1_power": payload.get("phase1_power", 0.0),
|
||||
"phase2_power": payload.get("phase2_power", 0.0),
|
||||
"phase3_power": payload.get("phase3_power", 0.0),
|
||||
"phase1_voltage": payload.get("phase1_voltage", 0.0),
|
||||
"phase2_voltage": payload.get("phase2_voltage", 0.0),
|
||||
"phase3_voltage": payload.get("phase3_voltage", 0.0),
|
||||
"phase1_current": payload.get("phase1_current", 0.0),
|
||||
"phase2_current": payload.get("phase2_current", 0.0),
|
||||
"phase3_current": payload.get("phase3_current", 0.0),
|
||||
}
|
||||
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def _transform_three_phase_powermeter_hottis_pv_modbus_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform hottis_pv_modbus three_phase_powermeter payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- Map vendor field names to abstract field names
|
||||
- totalImportEnergy -> energy
|
||||
- powerL1/powerL2/powerL3 -> phase1_power/phase2_power/phase3_power
|
||||
- voltageL1/voltageL2/voltageL3 -> phase1_voltage/phase2_voltage/phase3_voltage
|
||||
- currentL1/currentL2/currentL3 -> phase1_current/phase2_current/phase3_current
|
||||
- Sum of powerL1..3 -> total_power
|
||||
"""
|
||||
data = json.loads(payload)
|
||||
|
||||
# Helper to read numeric values uniformly as float
|
||||
def _get_float(key: str, default: float = 0.0) -> float:
|
||||
return float(data.get(key, default))
|
||||
|
||||
# Read all numeric values via helper for consistent error handling
|
||||
phase1_power = _get_float("powerL1")
|
||||
phase2_power = _get_float("powerL2")
|
||||
phase3_power = _get_float("powerL3")
|
||||
|
||||
phase1_voltage = _get_float("voltageL1")
|
||||
phase2_voltage = _get_float("voltageL2")
|
||||
phase3_voltage = _get_float("voltageL3")
|
||||
|
||||
phase1_current = _get_float("currentL1")
|
||||
phase2_current = _get_float("currentL2")
|
||||
phase3_current = _get_float("currentL3")
|
||||
|
||||
energy = _get_float("totalImportEnergy")
|
||||
|
||||
abstract_payload = {
|
||||
"energy": energy,
|
||||
"total_power": phase1_power + phase2_power + phase3_power,
|
||||
"phase1_power": phase1_power,
|
||||
"phase2_power": phase2_power,
|
||||
"phase3_power": phase3_power,
|
||||
"phase1_voltage": phase1_voltage,
|
||||
"phase2_voltage": phase2_voltage,
|
||||
"phase3_voltage": phase3_voltage,
|
||||
"phase1_current": phase1_current,
|
||||
"phase2_current": phase2_current,
|
||||
"phase3_current": phase3_current,
|
||||
}
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# HANDLER FUNCTIONS: max technology (Homegear MAX!)
|
||||
# ============================================================================
|
||||
|
||||
def _transform_thermostat_max_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract thermostat payload to MAX! (Homegear) format.
|
||||
|
||||
MAX! expects only the integer temperature value (no JSON).
|
||||
|
||||
Transformations:
|
||||
- Extract 'target' temperature from payload
|
||||
- Convert float to integer (MAX! only accepts integers)
|
||||
- Return as plain string value
|
||||
|
||||
Example:
|
||||
- Abstract: {'mode': 'heat', 'target': 22.5}
|
||||
- MAX!: "22"
|
||||
|
||||
Note: MAX! ignores mode - it's always in heating mode
|
||||
"""
|
||||
if "target" not in payload:
|
||||
logger.warning(f"MAX! thermostat payload missing 'target': {payload}")
|
||||
return "21" # Default fallback
|
||||
|
||||
target_temp = payload["target"]
|
||||
|
||||
# Convert to integer (MAX! protocol requirement)
|
||||
if isinstance(target_temp, (int, float)):
|
||||
int_temp = int(round(target_temp))
|
||||
return str(int_temp)
|
||||
|
||||
logger.warning(f"MAX! invalid target temperature type: {type(target_temp)}, value: {target_temp}")
|
||||
return "21"
|
||||
|
||||
|
||||
def _transform_thermostat_max_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform MAX! (Homegear) thermostat payload to abstract format.
|
||||
|
||||
MAX! sends only the integer temperature value (no JSON).
|
||||
|
||||
Transformations:
|
||||
- Parse plain string/int value
|
||||
- Convert to float for abstract protocol
|
||||
- Wrap in abstract payload structure with mode='heat'
|
||||
|
||||
Example:
|
||||
- MAX!: "22" or 22
|
||||
- Abstract: {'target': 22.0, 'mode': 'heat'}
|
||||
|
||||
Note: MAX! doesn't send current temperature via SET_TEMPERATURE topic
|
||||
"""
|
||||
|
||||
# Handle both string and numeric input
|
||||
target_temp = float(payload.strip())
|
||||
|
||||
return {
|
||||
"target": target_temp,
|
||||
"mode": "heat" # MAX! is always in heating mode
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# REGISTRY: Maps (device_type, technology, direction) -> handler function
|
||||
# ============================================================================
|
||||
|
||||
TransformHandler = Callable[[dict[str, Any]], dict[str, Any]]
|
||||
TransformHandler = Callable[[Any], Any]
|
||||
|
||||
TRANSFORM_HANDLERS: dict[tuple[str, str, str], TransformHandler] = {
|
||||
# Light transformations
|
||||
("light", "simulator", "to_vendor"): _transform_light_simulator_to_vendor,
|
||||
("light", "simulator", "to_abstract"): _transform_light_simulator_to_abstract,
|
||||
("light", "zigbee2mqtt", "to_vendor"): _transform_light_zigbee2mqtt_to_vendor,
|
||||
("light", "zigbee2mqtt", "to_abstract"): _transform_light_zigbee2mqtt_to_abstract,
|
||||
# Build registry from vendor modules
|
||||
TRANSFORM_HANDLERS: dict[tuple[str, str, str], TransformHandler] = {}
|
||||
|
||||
# Thermostat transformations
|
||||
("thermostat", "simulator", "to_vendor"): _transform_thermostat_simulator_to_vendor,
|
||||
("thermostat", "simulator", "to_abstract"): _transform_thermostat_simulator_to_abstract,
|
||||
("thermostat", "zigbee2mqtt", "to_vendor"): _transform_thermostat_zigbee2mqtt_to_vendor,
|
||||
("thermostat", "zigbee2mqtt", "to_abstract"): _transform_thermostat_zigbee2mqtt_to_abstract,
|
||||
("thermostat", "max", "to_vendor"): _transform_thermostat_max_to_vendor,
|
||||
("thermostat", "max", "to_abstract"): _transform_thermostat_max_to_abstract,
|
||||
|
||||
# Contact sensor transformations (support both 'contact' and 'contact_sensor' types)
|
||||
("contact_sensor", "zigbee2mqtt", "to_vendor"): _transform_contact_sensor_zigbee2mqtt_to_vendor,
|
||||
("contact_sensor", "zigbee2mqtt", "to_abstract"): _transform_contact_sensor_zigbee2mqtt_to_abstract,
|
||||
("contact_sensor", "max", "to_vendor"): _transform_contact_sensor_max_to_vendor,
|
||||
("contact_sensor", "max", "to_abstract"): _transform_contact_sensor_max_to_abstract,
|
||||
("contact", "zigbee2mqtt", "to_vendor"): _transform_contact_sensor_zigbee2mqtt_to_vendor,
|
||||
("contact", "zigbee2mqtt", "to_abstract"): _transform_contact_sensor_zigbee2mqtt_to_abstract,
|
||||
("contact", "max", "to_vendor"): _transform_contact_sensor_max_to_vendor,
|
||||
("contact", "max", "to_abstract"): _transform_contact_sensor_max_to_abstract,
|
||||
|
||||
# Temperature & humidity sensor transformations (support both type aliases)
|
||||
("temp_humidity_sensor", "zigbee2mqtt", "to_vendor"): _transform_temp_humidity_sensor_zigbee2mqtt_to_vendor,
|
||||
("temp_humidity_sensor", "zigbee2mqtt", "to_abstract"): _transform_temp_humidity_sensor_zigbee2mqtt_to_abstract,
|
||||
("temp_humidity", "zigbee2mqtt", "to_vendor"): _transform_temp_humidity_sensor_zigbee2mqtt_to_vendor,
|
||||
("temp_humidity", "zigbee2mqtt", "to_abstract"): _transform_temp_humidity_sensor_zigbee2mqtt_to_abstract,
|
||||
|
||||
# Relay transformations
|
||||
("relay", "zigbee2mqtt", "to_vendor"): _transform_relay_zigbee2mqtt_to_vendor,
|
||||
("relay", "zigbee2mqtt", "to_abstract"): _transform_relay_zigbee2mqtt_to_abstract,
|
||||
("relay", "shelly", "to_vendor"): _transform_relay_shelly_to_vendor,
|
||||
("relay", "shelly", "to_abstract"): _transform_relay_shelly_to_abstract,
|
||||
("relay", "hottis_pv_modbus", "to_vendor"): _transform_relay_hottis_pv_modbus_to_vendor,
|
||||
("relay", "hottis_pv_modbus", "to_abstract"): _transform_relay_hottis_pv_modbus_to_abstract,
|
||||
("relay", "tasmota", "to_vendor"): _transform_relay_tasmota_to_vendor,
|
||||
("relay", "tasmota", "to_abstract"): _transform_relay_tasmota_to_abstract,
|
||||
|
||||
# Three-Phase Powermeter transformations
|
||||
("three_phase_powermeter", "hottis_pv_modbus", "to_vendor"): _transform_three_phase_powermeter_hottis_pv_modbus_to_vendor,
|
||||
("three_phase_powermeter", "hottis_pv_modbus", "to_abstract"): _transform_three_phase_powermeter_hottis_pv_modbus_to_abstract,
|
||||
}
|
||||
# Register handlers from each vendor module
|
||||
for vendor_name, vendor_module in [
|
||||
("simulator", simulator),
|
||||
("zigbee2mqtt", zigbee2mqtt),
|
||||
("max", max),
|
||||
("shelly", shelly),
|
||||
("tasmota", tasmota),
|
||||
("hottis_pv_modbus", hottis_pv_modbus),
|
||||
("hottis_wago_modbus", hottis_wago_modbus),
|
||||
]:
|
||||
for (device_type, direction), handler in vendor_module.HANDLERS.items():
|
||||
key = (device_type, vendor_name, direction)
|
||||
TRANSFORM_HANDLERS[key] = handler
|
||||
|
||||
|
||||
def _get_transform_handler(
|
||||
@@ -656,7 +84,7 @@ def transform_abstract_to_vendor(
|
||||
device_type: str,
|
||||
device_technology: str,
|
||||
abstract_payload: dict[str, Any]
|
||||
) -> dict[str, Any]:
|
||||
) -> str:
|
||||
"""Transform abstract payload to vendor-specific format.
|
||||
|
||||
Args:
|
||||
@@ -665,7 +93,7 @@ def transform_abstract_to_vendor(
|
||||
abstract_payload: Payload in abstract home protocol format
|
||||
|
||||
Returns:
|
||||
Payload in vendor-specific format
|
||||
Payload in vendor-specific format (as string)
|
||||
"""
|
||||
logger.debug(
|
||||
f"transform_abstract_to_vendor IN: type={device_type}, tech={device_technology}, "
|
||||
@@ -692,7 +120,7 @@ def transform_vendor_to_abstract(
|
||||
Args:
|
||||
device_type: Type of device (e.g., "light", "thermostat")
|
||||
device_technology: Technology/vendor (e.g., "simulator", "zigbee2mqtt")
|
||||
vendor_payload: Payload in vendor-specific format
|
||||
vendor_payload: Payload in vendor-specific format (as string)
|
||||
|
||||
Returns:
|
||||
Payload in abstract home protocol format
|
||||
|
||||
1
apps/abstraction/vendors/__init__.py
vendored
Normal file
1
apps/abstraction/vendors/__init__.py
vendored
Normal file
@@ -0,0 +1 @@
|
||||
"""Vendor-specific transformation modules."""
|
||||
107
apps/abstraction/vendors/hottis_pv_modbus.py
vendored
Normal file
107
apps/abstraction/vendors/hottis_pv_modbus.py
vendored
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Hottis PV Modbus vendor transformations."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def transform_relay_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to Hottis Modbus format.
|
||||
|
||||
Hottis Modbus expects plain text 'on' or 'off'.
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on'}
|
||||
- Hottis Modbus: 'on'
|
||||
"""
|
||||
power = payload.get("power", "off")
|
||||
return power
|
||||
|
||||
|
||||
def transform_relay_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform Hottis Modbus relay payload to abstract format.
|
||||
|
||||
Hottis Modbus sends JSON like:
|
||||
{"status": "Ok", "timestamp": "...", "state": false, "cnt": 528}
|
||||
|
||||
Transformations:
|
||||
- state: true -> power: 'on'
|
||||
- state: false -> power: 'off'
|
||||
"""
|
||||
data = json.loads(payload)
|
||||
state = data.get("state", False)
|
||||
power = "on" if bool(state) else "off"
|
||||
return {"power": power}
|
||||
|
||||
|
||||
def transform_three_phase_powermeter_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract three_phase_powermeter payload to hottis_pv_modbus format."""
|
||||
vendor_payload = {
|
||||
"energy": payload.get("energy", 0.0),
|
||||
"total_power": payload.get("total_power", 0.0),
|
||||
"phase1_power": payload.get("phase1_power", 0.0),
|
||||
"phase2_power": payload.get("phase2_power", 0.0),
|
||||
"phase3_power": payload.get("phase3_power", 0.0),
|
||||
"phase1_voltage": payload.get("phase1_voltage", 0.0),
|
||||
"phase2_voltage": payload.get("phase2_voltage", 0.0),
|
||||
"phase3_voltage": payload.get("phase3_voltage", 0.0),
|
||||
"phase1_current": payload.get("phase1_current", 0.0),
|
||||
"phase2_current": payload.get("phase2_current", 0.0),
|
||||
"phase3_current": payload.get("phase3_current", 0.0),
|
||||
}
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def transform_three_phase_powermeter_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform hottis_pv_modbus three_phase_powermeter payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- totalImportEnergy -> energy
|
||||
- powerL1/powerL2/powerL3 -> phase1_power/phase2_power/phase3_power
|
||||
- voltageL1/voltageL2/voltageL3 -> phase1_voltage/phase2_voltage/phase3_voltage
|
||||
- currentL1/currentL2/currentL3 -> phase1_current/phase2_current/phase3_current
|
||||
- Sum of powerL1..3 -> total_power
|
||||
"""
|
||||
data = json.loads(payload)
|
||||
|
||||
def _get_float(key: str, default: float = 0.0) -> float:
|
||||
return float(data.get(key, default))
|
||||
|
||||
phase1_power = _get_float("powerL1")
|
||||
phase2_power = _get_float("powerL2")
|
||||
phase3_power = _get_float("powerL3")
|
||||
|
||||
phase1_voltage = _get_float("voltageL1")
|
||||
phase2_voltage = _get_float("voltageL2")
|
||||
phase3_voltage = _get_float("voltageL3")
|
||||
|
||||
phase1_current = _get_float("currentL1")
|
||||
phase2_current = _get_float("currentL2")
|
||||
phase3_current = _get_float("currentL3")
|
||||
|
||||
energy = _get_float("totalImportEnergy")
|
||||
|
||||
return {
|
||||
"energy": energy,
|
||||
"total_power": phase1_power + phase2_power + phase3_power,
|
||||
"phase1_power": phase1_power,
|
||||
"phase2_power": phase2_power,
|
||||
"phase3_power": phase3_power,
|
||||
"phase1_voltage": phase1_voltage,
|
||||
"phase2_voltage": phase2_voltage,
|
||||
"phase3_voltage": phase3_voltage,
|
||||
"phase1_current": phase1_current,
|
||||
"phase2_current": phase2_current,
|
||||
"phase3_current": phase3_current,
|
||||
}
|
||||
|
||||
|
||||
# Registry of handlers for this vendor
|
||||
HANDLERS = {
|
||||
("relay", "to_vendor"): transform_relay_to_vendor,
|
||||
("relay", "to_abstract"): transform_relay_to_abstract,
|
||||
("three_phase_powermeter", "to_vendor"): transform_three_phase_powermeter_to_vendor,
|
||||
("three_phase_powermeter", "to_abstract"): transform_three_phase_powermeter_to_abstract,
|
||||
}
|
||||
95
apps/abstraction/vendors/max.py
vendored
Normal file
95
apps/abstraction/vendors/max.py
vendored
Normal file
@@ -0,0 +1,95 @@
|
||||
"""MAX! (Homegear) vendor transformations."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def transform_contact_sensor_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract contact sensor payload to MAX! format.
|
||||
|
||||
Contact sensors are read-only.
|
||||
"""
|
||||
logger.warning("Contact sensors are read-only - SET commands should not be used")
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def transform_contact_sensor_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform MAX! contact sensor payload to abstract format.
|
||||
|
||||
MAX! sends "true"/"false" (string or bool) on STATE topic.
|
||||
|
||||
Transformations:
|
||||
- "true" or True -> "open" (window/door open)
|
||||
- "false" or False -> "closed" (window/door closed)
|
||||
|
||||
Example:
|
||||
- MAX!: "true"
|
||||
- Abstract: {"contact": "open"}
|
||||
"""
|
||||
try:
|
||||
contact_value = payload.strip().lower() == "true"
|
||||
return {
|
||||
"contact": "open" if contact_value else "closed"
|
||||
}
|
||||
except (ValueError, TypeError) as e:
|
||||
logger.error(f"MAX! contact sensor failed to parse: {payload}, error: {e}")
|
||||
return {"contact": "closed"}
|
||||
|
||||
|
||||
def transform_thermostat_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract thermostat payload to MAX! format.
|
||||
|
||||
MAX! expects only the integer temperature value (no JSON).
|
||||
|
||||
Transformations:
|
||||
- Extract 'target' temperature from payload
|
||||
- Convert float to integer
|
||||
- Return as plain string value
|
||||
|
||||
Example:
|
||||
- Abstract: {'target': 22.5}
|
||||
- MAX!: "22"
|
||||
"""
|
||||
if "target" not in payload:
|
||||
logger.warning(f"MAX! thermostat payload missing 'target': {payload}")
|
||||
return "21"
|
||||
|
||||
target_temp = payload["target"]
|
||||
|
||||
if isinstance(target_temp, (int, float)):
|
||||
int_temp = int(round(target_temp))
|
||||
return str(int_temp)
|
||||
|
||||
logger.warning(f"MAX! invalid target temperature type: {type(target_temp)}")
|
||||
return "21"
|
||||
|
||||
|
||||
def transform_thermostat_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform MAX! thermostat payload to abstract format.
|
||||
|
||||
MAX! sends only the integer temperature value (no JSON).
|
||||
|
||||
Example:
|
||||
- MAX!: "22"
|
||||
- Abstract: {'target': 22.0, 'mode': 'heat'}
|
||||
"""
|
||||
target_temp = float(payload.strip())
|
||||
|
||||
return {
|
||||
"target": target_temp,
|
||||
"mode": "heat"
|
||||
}
|
||||
|
||||
|
||||
# Registry of handlers for this vendor
|
||||
HANDLERS = {
|
||||
("contact_sensor", "to_vendor"): transform_contact_sensor_to_vendor,
|
||||
("contact_sensor", "to_abstract"): transform_contact_sensor_to_abstract,
|
||||
("contact", "to_vendor"): transform_contact_sensor_to_vendor,
|
||||
("contact", "to_abstract"): transform_contact_sensor_to_abstract,
|
||||
("thermostat", "to_vendor"): transform_thermostat_to_vendor,
|
||||
("thermostat", "to_abstract"): transform_thermostat_to_abstract,
|
||||
}
|
||||
38
apps/abstraction/vendors/shelly.py
vendored
Normal file
38
apps/abstraction/vendors/shelly.py
vendored
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Shelly vendor transformations."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def transform_relay_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to Shelly format.
|
||||
|
||||
Shelly expects plain text 'on' or 'off' (not JSON).
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on'}
|
||||
- Shelly: 'on'
|
||||
"""
|
||||
power = payload.get("power", "off")
|
||||
return power
|
||||
|
||||
|
||||
def transform_relay_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform Shelly relay payload to abstract format.
|
||||
|
||||
Shelly sends plain text 'on' or 'off'.
|
||||
|
||||
Example:
|
||||
- Shelly: 'on'
|
||||
- Abstract: {'power': 'on'}
|
||||
"""
|
||||
return {"power": payload.strip()}
|
||||
|
||||
|
||||
# Registry of handlers for this vendor
|
||||
HANDLERS = {
|
||||
("relay", "to_vendor"): transform_relay_to_vendor,
|
||||
("relay", "to_abstract"): transform_relay_to_abstract,
|
||||
}
|
||||
50
apps/abstraction/vendors/simulator.py
vendored
Normal file
50
apps/abstraction/vendors/simulator.py
vendored
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Simulator vendor transformations."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def transform_light_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract light payload to simulator format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def transform_light_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform simulator light payload to abstract format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
return payload
|
||||
|
||||
|
||||
def transform_thermostat_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract thermostat payload to simulator format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def transform_thermostat_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform simulator thermostat payload to abstract format.
|
||||
|
||||
Simulator uses same format as abstract protocol (no transformation needed).
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
return payload
|
||||
|
||||
|
||||
# Registry of handlers for this vendor
|
||||
HANDLERS = {
|
||||
("light", "to_vendor"): transform_light_to_vendor,
|
||||
("light", "to_abstract"): transform_light_to_abstract,
|
||||
("thermostat", "to_vendor"): transform_thermostat_to_vendor,
|
||||
("thermostat", "to_abstract"): transform_thermostat_to_abstract,
|
||||
}
|
||||
38
apps/abstraction/vendors/tasmota.py
vendored
Normal file
38
apps/abstraction/vendors/tasmota.py
vendored
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Tasmota vendor transformations."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def transform_relay_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to Tasmota format.
|
||||
|
||||
Tasmota expects plain text 'on' or 'off' (not JSON).
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on'}
|
||||
- Tasmota: 'on'
|
||||
"""
|
||||
power = payload.get("power", "off")
|
||||
return power
|
||||
|
||||
|
||||
def transform_relay_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform Tasmota relay payload to abstract format.
|
||||
|
||||
Tasmota sends plain text 'ON' or 'OFF'.
|
||||
|
||||
Example:
|
||||
- Tasmota: 'ON'
|
||||
- Abstract: {'power': 'on'}
|
||||
"""
|
||||
return {"power": payload.strip().lower()}
|
||||
|
||||
|
||||
# Registry of handlers for this vendor
|
||||
HANDLERS = {
|
||||
("relay", "to_vendor"): transform_relay_to_vendor,
|
||||
("relay", "to_abstract"): transform_relay_to_abstract,
|
||||
}
|
||||
209
apps/abstraction/vendors/zigbee2mqtt.py
vendored
Normal file
209
apps/abstraction/vendors/zigbee2mqtt.py
vendored
Normal file
@@ -0,0 +1,209 @@
|
||||
"""Zigbee2MQTT vendor transformations."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def transform_light_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract light payload to zigbee2mqtt format.
|
||||
|
||||
Transformations:
|
||||
- power: 'on'/'off' -> state: 'ON'/'OFF'
|
||||
- brightness: 0-100 -> brightness: 0-254
|
||||
|
||||
Example:
|
||||
- Abstract: {'power': 'on', 'brightness': 100}
|
||||
- zigbee2mqtt: {'state': 'ON', 'brightness': 254}
|
||||
"""
|
||||
vendor_payload = payload.copy()
|
||||
|
||||
# Transform power -> state with uppercase values
|
||||
if "power" in vendor_payload:
|
||||
power_value = vendor_payload.pop("power")
|
||||
vendor_payload["state"] = power_value.upper() if isinstance(power_value, str) else power_value
|
||||
|
||||
# Transform brightness: 0-100 (%) -> 0-254 (zigbee2mqtt range)
|
||||
if "brightness" in vendor_payload:
|
||||
abstract_brightness = vendor_payload["brightness"]
|
||||
if isinstance(abstract_brightness, (int, float)):
|
||||
vendor_payload["brightness"] = round(abstract_brightness * 254 / 100)
|
||||
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def transform_light_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt light payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- state: 'ON'/'OFF' -> power: 'on'/'off'
|
||||
- brightness: 0-254 -> brightness: 0-100
|
||||
|
||||
Example:
|
||||
- zigbee2mqtt: {'state': 'ON', 'brightness': 254}
|
||||
- Abstract: {'power': 'on', 'brightness': 100}
|
||||
"""
|
||||
abstract_payload = json.loads(payload)
|
||||
|
||||
# Transform state -> power with lowercase values
|
||||
if "state" in abstract_payload:
|
||||
state_value = abstract_payload.pop("state")
|
||||
abstract_payload["power"] = state_value.lower() if isinstance(state_value, str) else state_value
|
||||
|
||||
# Transform brightness: 0-254 (zigbee2mqtt range) -> 0-100 (%)
|
||||
if "brightness" in abstract_payload:
|
||||
vendor_brightness = abstract_payload["brightness"]
|
||||
if isinstance(vendor_brightness, (int, float)):
|
||||
abstract_payload["brightness"] = round(vendor_brightness * 100 / 254)
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
def transform_thermostat_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract thermostat payload to zigbee2mqtt format.
|
||||
|
||||
Transformations:
|
||||
- target -> current_heating_setpoint (as string)
|
||||
- mode is ignored (zigbee2mqtt thermostats use system_mode in state only)
|
||||
|
||||
Example:
|
||||
- Abstract: {'target': 22.0}
|
||||
- zigbee2mqtt: {'current_heating_setpoint': '22.0'}
|
||||
"""
|
||||
vendor_payload = {}
|
||||
|
||||
if "target" in payload:
|
||||
vendor_payload["current_heating_setpoint"] = str(payload["target"])
|
||||
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def transform_thermostat_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt thermostat payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- current_heating_setpoint -> target (as float)
|
||||
- local_temperature -> current (as float)
|
||||
- system_mode -> mode
|
||||
|
||||
Example:
|
||||
- zigbee2mqtt: {'current_heating_setpoint': 15, 'local_temperature': 23, 'system_mode': 'heat'}
|
||||
- Abstract: {'target': 15.0, 'current': 23.0, 'mode': 'heat'}
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
abstract_payload = {}
|
||||
|
||||
if "current_heating_setpoint" in payload:
|
||||
setpoint = payload["current_heating_setpoint"]
|
||||
abstract_payload["target"] = float(setpoint)
|
||||
|
||||
if "local_temperature" in payload:
|
||||
current = payload["local_temperature"]
|
||||
abstract_payload["current"] = float(current)
|
||||
|
||||
if "system_mode" in payload:
|
||||
abstract_payload["mode"] = payload["system_mode"]
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
def transform_contact_sensor_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract contact sensor payload to zigbee2mqtt format.
|
||||
|
||||
Contact sensors are read-only, so this should not be called for SET commands.
|
||||
"""
|
||||
logger.warning("Contact sensors are read-only - SET commands should not be used")
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def transform_contact_sensor_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt contact sensor payload to abstract format.
|
||||
|
||||
Transformations:
|
||||
- contact: bool -> "open" | "closed"
|
||||
- zigbee2mqtt semantics: False = OPEN, True = CLOSED (inverted!)
|
||||
|
||||
Example:
|
||||
- zigbee2mqtt: {"contact": false, "battery": 100}
|
||||
- Abstract: {"contact": "open", "battery": 100}
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
abstract_payload = {}
|
||||
|
||||
if "contact" in payload:
|
||||
contact_bool = payload["contact"]
|
||||
abstract_payload["contact"] = "closed" if contact_bool else "open"
|
||||
|
||||
# Pass through optional fields
|
||||
for field in ["battery", "linkquality", "device_temperature", "voltage"]:
|
||||
if field in payload:
|
||||
abstract_payload[field] = payload[field]
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
def transform_temp_humidity_sensor_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract temp/humidity sensor payload to zigbee2mqtt format.
|
||||
|
||||
Temp/humidity sensors are read-only.
|
||||
"""
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
def transform_temp_humidity_sensor_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt temp/humidity sensor payload to abstract format.
|
||||
|
||||
Passthrough - zigbee2mqtt provides temperature, humidity, battery, linkquality directly.
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
return payload
|
||||
|
||||
|
||||
def transform_relay_to_vendor(payload: dict[str, Any]) -> str:
|
||||
"""Transform abstract relay payload to zigbee2mqtt format.
|
||||
|
||||
- power: 'on'/'off' -> state: 'ON'/'OFF'
|
||||
"""
|
||||
vendor_payload = payload.copy()
|
||||
|
||||
if "power" in vendor_payload:
|
||||
power_value = vendor_payload.pop("power")
|
||||
vendor_payload["state"] = power_value.upper() if isinstance(power_value, str) else power_value
|
||||
|
||||
return json.dumps(vendor_payload)
|
||||
|
||||
|
||||
def transform_relay_to_abstract(payload: str) -> dict[str, Any]:
|
||||
"""Transform zigbee2mqtt relay payload to abstract format.
|
||||
|
||||
- state: 'ON'/'OFF' -> power: 'on'/'off'
|
||||
"""
|
||||
payload = json.loads(payload)
|
||||
abstract_payload = payload.copy()
|
||||
|
||||
if "state" in abstract_payload:
|
||||
state_value = abstract_payload.pop("state")
|
||||
abstract_payload["power"] = state_value.lower() if isinstance(state_value, str) else state_value
|
||||
|
||||
return abstract_payload
|
||||
|
||||
|
||||
# Registry of handlers for this vendor
|
||||
HANDLERS = {
|
||||
("light", "to_vendor"): transform_light_to_vendor,
|
||||
("light", "to_abstract"): transform_light_to_abstract,
|
||||
("thermostat", "to_vendor"): transform_thermostat_to_vendor,
|
||||
("thermostat", "to_abstract"): transform_thermostat_to_abstract,
|
||||
("contact_sensor", "to_vendor"): transform_contact_sensor_to_vendor,
|
||||
("contact_sensor", "to_abstract"): transform_contact_sensor_to_abstract,
|
||||
("contact", "to_vendor"): transform_contact_sensor_to_vendor,
|
||||
("contact", "to_abstract"): transform_contact_sensor_to_abstract,
|
||||
("temp_humidity_sensor", "to_vendor"): transform_temp_humidity_sensor_to_vendor,
|
||||
("temp_humidity_sensor", "to_abstract"): transform_temp_humidity_sensor_to_abstract,
|
||||
("temp_humidity", "to_vendor"): transform_temp_humidity_sensor_to_vendor,
|
||||
("temp_humidity", "to_abstract"): transform_temp_humidity_sensor_to_abstract,
|
||||
("relay", "to_vendor"): transform_relay_to_vendor,
|
||||
("relay", "to_abstract"): transform_relay_to_abstract,
|
||||
}
|
||||
@@ -176,15 +176,20 @@ async def mqtt_worker(shutdown_event: asyncio.Event) -> None:
|
||||
# Publish startup message
|
||||
await publish_example(client)
|
||||
|
||||
# Message loop
|
||||
# Message loop with timeout to allow shutdown check
|
||||
async for message in client.messages:
|
||||
if shutdown_event.is_set():
|
||||
logger.info("Shutdown event detected, breaking message loop")
|
||||
break
|
||||
try:
|
||||
await handle_message(message, client)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in message handler: {e}", exc_info=True)
|
||||
|
||||
# If we exit the loop due to shutdown, break the reconnect loop too
|
||||
if shutdown_event.is_set():
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("MQTT worker cancelled")
|
||||
break
|
||||
@@ -217,8 +222,17 @@ async def main() -> None:
|
||||
# Wait for shutdown signal
|
||||
await shutdown_event.wait()
|
||||
|
||||
# Wait for worker to finish
|
||||
await worker_task
|
||||
# Give worker a moment to finish gracefully
|
||||
logger.info("Waiting for MQTT worker to finish...")
|
||||
try:
|
||||
await asyncio.wait_for(worker_task, timeout=5.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("MQTT worker did not finish in time, cancelling...")
|
||||
worker_task.cancel()
|
||||
try:
|
||||
await worker_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
logger.info("Pulsegen application stopped")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user