"""Hottis PV Modbus vendor transformations.""" import json import logging from typing import Any logger = logging.getLogger(__name__) def transform_relay_to_vendor(payload: dict[str, Any]) -> str: """Transform abstract relay payload to Hottis Modbus format. Hottis Modbus expects plain text 'on' or 'off'. Example: - Abstract: {'power': 'on'} - Hottis Modbus: 'on' """ power = payload.get("power", "off") return power def transform_relay_to_abstract(payload: str) -> dict[str, Any]: """Transform Hottis Modbus relay payload to abstract format. Hottis Modbus sends JSON like: {"status": "Ok", "timestamp": "...", "state": false, "cnt": 528} Transformations: - state: true -> power: 'on' - state: false -> power: 'off' """ data = json.loads(payload) state = data.get("state", False) power = "on" if bool(state) else "off" return {"power": power} def transform_three_phase_powermeter_to_vendor(payload: dict[str, Any]) -> str: """Transform abstract three_phase_powermeter payload to hottis_pv_modbus format.""" vendor_payload = { "energy": payload.get("energy", 0.0), "total_power": payload.get("total_power", 0.0), "phase1_power": payload.get("phase1_power", 0.0), "phase2_power": payload.get("phase2_power", 0.0), "phase3_power": payload.get("phase3_power", 0.0), "phase1_voltage": payload.get("phase1_voltage", 0.0), "phase2_voltage": payload.get("phase2_voltage", 0.0), "phase3_voltage": payload.get("phase3_voltage", 0.0), "phase1_current": payload.get("phase1_current", 0.0), "phase2_current": payload.get("phase2_current", 0.0), "phase3_current": payload.get("phase3_current", 0.0), } return json.dumps(vendor_payload) def transform_three_phase_powermeter_to_abstract(payload: str) -> dict[str, Any]: """Transform hottis_pv_modbus three_phase_powermeter payload to abstract format. Transformations: - totalImportEnergy -> energy - powerL1/powerL2/powerL3 -> phase1_power/phase2_power/phase3_power - voltageL1/voltageL2/voltageL3 -> phase1_voltage/phase2_voltage/phase3_voltage - currentL1/currentL2/currentL3 -> phase1_current/phase2_current/phase3_current - Sum of powerL1..3 -> total_power """ data = json.loads(payload) def _get_float(key: str, default: float = 0.0) -> float: return float(data.get(key, default)) phase1_power = _get_float("powerL1") phase2_power = _get_float("powerL2") phase3_power = _get_float("powerL3") phase1_voltage = _get_float("voltageL1") phase2_voltage = _get_float("voltageL2") phase3_voltage = _get_float("voltageL3") phase1_current = _get_float("currentL1") phase2_current = _get_float("currentL2") phase3_current = _get_float("currentL3") energy = _get_float("totalImportEnergy") return { "energy": energy, "total_power": phase1_power + phase2_power + phase3_power, "phase1_power": phase1_power, "phase2_power": phase2_power, "phase3_power": phase3_power, "phase1_voltage": phase1_voltage, "phase2_voltage": phase2_voltage, "phase3_voltage": phase3_voltage, "phase1_current": phase1_current, "phase2_current": phase2_current, "phase3_current": phase3_current, } # Registry of handlers for this vendor HANDLERS = { ("relay", "to_vendor"): transform_relay_to_vendor, ("relay", "to_abstract"): transform_relay_to_abstract, ("three_phase_powermeter", "to_vendor"): transform_three_phase_powermeter_to_vendor, ("three_phase_powermeter", "to_abstract"): transform_three_phase_powermeter_to_abstract, }