This commit is contained in:
2026-01-13 12:59:25 +01:00
3 changed files with 109 additions and 16 deletions

62
apps/abstraction/vendors/test.py vendored Normal file
View File

@@ -0,0 +1,62 @@
"""test vendor transformations."""
import json
import logging
from typing import Any
logger = logging.getLogger(__name__)
def transform_contact_sensor_to_vendor(payload: dict[str, Any]) -> str:
"""Transform abstract contact sensor payload to MAX! format.
Contact sensors are read-only.
"""
logger.warning("Contact sensors are read-only - SET commands should not be used")
return json.dumps(payload)
def transform_contact_sensor_to_abstract(payload: str) -> dict[str, Any]:
try:
contact_value = payload.strip().lower()
return {
"contact": "open" if (contact_value == "open") else "closed"
}
except (ValueError, TypeError) as e:
logger.error(f"contact sensor failed to parse: {payload}, error: {e}")
return {"contact": "closed"}
def transform_thermostat_to_vendor(payload: dict[str, Any]) -> str:
if "target" not in payload:
logger.warning(f"thermostat payload missing 'target': {payload}")
return "21"
target_temp = payload["target"]
if isinstance(target_temp, (int, float)):
int_temp = int(round(target_temp))
return str(int_temp)
logger.warning(f"invalid target temperature type: {type(target_temp)}")
return "21"
def transform_thermostat_to_abstract(payload: str) -> dict[str, Any]:
target_temp = float(payload.strip())
return {
"target": target_temp,
"mode": "heat"
}
# Registry of handlers for this vendor
HANDLERS = {
("contact_sensor", "to_vendor"): transform_contact_sensor_to_vendor,
("contact_sensor", "to_abstract"): transform_contact_sensor_to_abstract,
("contact", "to_vendor"): transform_contact_sensor_to_vendor,
("contact", "to_abstract"): transform_contact_sensor_to_abstract,
("thermostat", "to_vendor"): transform_thermostat_to_vendor,
("thermostat", "to_abstract"): transform_thermostat_to_abstract,
}