diff --git a/config/config.yaml b/config/config.yaml index 6a840d2..4a2577f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,7 +1,10 @@ +global: + scan_interval: 1 + log_level: INFO + mqtt: broker: 172.16.2.16 port: 1883 - publish_period: 15 modbus: gateway: 172.16.2.42 @@ -23,8 +26,23 @@ modbus: # { "slave":1, "addr":0x0001, "type":"holding", "attr": "state", "name":"State", "unit":"-", "adaptor": onOffAdaptor }, # ] + +input: + - name: pv_control + subscribe_topic: IoT/PV/Control + slave_id: 1 + address: 0 + register_type: coil + - name: car_control + subscribe_topic: IoT/Car/Control + slave_id: 5 + address: 0 + register_type: coil + output: - name: pv_meter + enabled: true + scan_rate: 15 publish_topic: IoT/PV/Values slave_id: 2 registers: @@ -120,7 +138,8 @@ output: data_type: float32 adaptor: floatAdaptor - name: pv_control - publish_topic: IoT/PV/Control + publish_topic: IoT/PV/Control/State + scan_rate: 1 slave_id: 1 registers: - address: 0x0001 @@ -131,7 +150,9 @@ output: data_type: int32 adaptor: onOffAdaptor - name: car_control - publish_topic: IoT/Car/Control + enabled: true + publish_topic: IoT/Car/Control/State + scan_rate: 1 slave_id: 5 registers: - address: 0x0001 @@ -144,26 +165,83 @@ output: - name: car_meter enabled: true publish_topic: IoT/Car/Values + scan_rate: 15 slave_id: 6 registers: - - address: 14 + - address: 0 attribute: voltageL1 name: Voltage L1 unit: V - register_type: holding + register_type: input data_type: float32 adaptor: floatAdaptor - - address: 16 + - address: 2 attribute: voltageL2 name: Voltage L2 unit: V - register_type: holding + register_type: input data_type: float32 adaptor: floatAdaptor - - address: 18 + - address: 4 attribute: voltageL3 name: Voltage L3 unit: V - register_type: holding + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 6 + attribute: currentL1 + name: Current L1 + unit: A + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 8 + attribute: currentL2 + name: Current L2 + unit: A + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 10 + attribute: currentL3 + name: Current L3 + unit: A + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 12 + attribute: powerL1 + name: Power L1 + unit: W + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 14 + attribute: powerL2 + name: Power L2 + unit: W + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 16 + attribute: powerL3 + name: Power L3 + unit: W + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 0x0048 + attribute: totalImportEnergy + name: Total Import Energy + unit: kWh + register_type: input + data_type: float32 + adaptor: floatAdaptor + - address: 0x004a + attribute: totalExportEnergy + name: Total Export Energy + unit: kWh + register_type: input data_type: float32 adaptor: floatAdaptor diff --git a/src/pv_controller/FromDevices.py b/src/pv_controller/FromDevices.py index ef28a75..be8fc83 100644 --- a/src/pv_controller/FromDevices.py +++ b/src/pv_controller/FromDevices.py @@ -9,7 +9,7 @@ def floatAdaptor(i): return float(f"{i:0.2f}") if i else 0.0 def onOffAdaptor(i): - return i[0] if i else '-1' + return bool(i) @@ -22,27 +22,39 @@ class FromDevices(AbstractMqttPublisher): cnt = 0 while not self.killBill: cnt += 1 - payload = {} - try: - payload['status'] = "Error" - payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow()) - for device in self.config.output: + for device in self.config.output: + try: + payload = {} + payload['status'] = "Error" + payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow()) logger.debug(f"{device.name=} {device.publish_topic=}") if not device.enabled: logger.debug(f" device disabled, skipping") continue + if cnt % device.scan_rate != 0: + logger.debug(f" not scan_rate yet, skipping") + continue for registers in device.registers: logger.debug(f" {registers.name=} {registers.address=} {registers.register_type=}") rawValue = self.modbusHandler.readRegister(registers.register_type, device.slave_id, registers.address, registers.data_type) logger.debug(f" {rawValue=}") - payload['status'] = "Ok" - except Exception as e: - logger.error(f"Caught exception: {str(e)}") + if registers.adaptor == "floatAdaptor": + value = floatAdaptor(rawValue) + elif registers.adaptor == "onOffAdaptor": + value = onOffAdaptor(rawValue) + else: + value = rawValue + logger.debug(f" {value=}") + payload[registers.attribute] = value -# payload['cnt'] = cnt -# payloadStr = json.dumps(payload) -# self.client.publish(topic, payloadStr) -# logger.info(f"mqtt message sent: {topic} -> {payloadStr}") + payload['status'] = "Ok" + except Exception as e: + logger.error(f"Caught exception: {str(e)}") - self.killEvent.wait(timeout=float(self.config.mqtt.publish_period)) + payload['cnt'] = cnt + payloadStr = json.dumps(payload) + self.client.publish(device.publish_topic, payloadStr) + logger.debug(f"mqtt message sent: {device.publish_topic} -> {payloadStr}") + + self.killEvent.wait(timeout=float(self.config.global_.scan_interval)) diff --git a/src/pv_controller/ModbusBase.py b/src/pv_controller/ModbusBase.py index 9c2fdf5..9eee37d 100644 --- a/src/pv_controller/ModbusBase.py +++ b/src/pv_controller/ModbusBase.py @@ -31,7 +31,9 @@ READ_REGISTER_FUNCTIONS = { DATA_TYPES = { 'int16': ModbusTcpClient.DATATYPE.INT16, + 'uint16': ModbusTcpClient.DATATYPE.UINT16, 'int32': ModbusTcpClient.DATATYPE.INT32, + 'uint32': ModbusTcpClient.DATATYPE.UINT32, 'float32': ModbusTcpClient.DATATYPE.FLOAT32, 'float64': ModbusTcpClient.DATATYPE.FLOAT64 } @@ -51,7 +53,7 @@ class ModbusHandler: dataType = DATA_TYPES[data_type] count = dataType.value[1] - logger.debug(f"{self.client=}, {addr=}, {count=}, {slave=}") + logger.debug(f"{addr=}, {count=}, {slave=}") res = readFunc(self.client, addr, count=count, device_id=slave) if (isinstance(res, pymodbus.pdu.register_message.ReadHoldingRegistersResponse) or isinstance(res, pymodbus.pdu.register_message.ReadInputRegistersResponse) or @@ -66,7 +68,7 @@ class ModbusHandler: def writeCoil(self, slave, addr, value): - res = self.client.write_coil(addr, value, slave=slave) + res = self.client.write_coil(addr, value, device_id=slave) logger.debug(f"write coil result {res}") return value diff --git a/src/pv_controller/MqttBase.py b/src/pv_controller/MqttBase.py index 9028a19..ba11816 100644 --- a/src/pv_controller/MqttBase.py +++ b/src/pv_controller/MqttBase.py @@ -1,7 +1,7 @@ import paho.mqtt.client as mqtt from loguru import logger +import uuid import threading -import ssl @@ -21,7 +21,9 @@ class AbstractMqttPublisher(threading.Thread): self.config = config - self.client = mqtt.Client(userdata=self) + client_id = f"pv-controller-{uuid.uuid4()}" + logger.info(f"mqtt client id: {client_id}") + self.client = mqtt.Client(client_id=client_id, userdata=self) # consider this flag in the localLoop self.killBill = False diff --git a/src/pv_controller/ToDevices.py b/src/pv_controller/ToDevices.py index 980cd8c..55f221c 100644 --- a/src/pv_controller/ToDevices.py +++ b/src/pv_controller/ToDevices.py @@ -13,17 +13,26 @@ class ToDevices(AbstractMqttPublisher): sleep(60.0) def onMessage(self, topic, payload): - logger.info("mqtt message received: {} -> {}".format(topic, str(payload))) - if payload == b'On': - self.modbusHandler.writeCoil(1, 0, 1) - elif payload == b'Off': - self.modbusHandler.writeCoil(1, 0, 0) - else: - logger.warning(f"Illegal command {payload} received") - + try: + logger.debug("mqtt message received: {} -> {}".format(topic, str(payload))) + for device in self.config.input: + if topic != device.subscribe_topic: + continue + logger.debug(f"{topic=} matches {device.subscribe_topic=}, processing") + if not device.enabled: + logger.debug(f" device disabled, skipping") + continue + if device.register_type != 'coil': + raise Exception(f"Unsupported register type {device.register_type} for input device {device.name}") + value = payload == b'On' + self.modbusHandler.writeCoil(device.slave_id, device.address, value) + except Exception as e: + logger.error(f"Caught exception in onMessage: {str(e)}") def onConnect(self): logger.info("mqtt connected") - self.client.subscribe("{}".format(self.config["relaisSubscribeTopic"])) + for device in self.config.input: + self.client.subscribe(device.subscribe_topic) + logger.info(f"subscribed to topic: {device.subscribe_topic}") logger.info("subscribed") diff --git a/src/pv_controller/config.py b/src/pv_controller/config.py index 36d4e53..ff9d0da 100644 --- a/src/pv_controller/config.py +++ b/src/pv_controller/config.py @@ -22,16 +22,25 @@ class OutputConfig(BaseModel): """Output Configuration for Modbus Devices""" name: str enabled: bool = Field(default=True) + scan_rate: Optional[int] = Field(default=60) publish_topic: str slave_id: int registers: List[RegisterConfig] +class InputConfig(BaseModel): + """Input Configuration for Modbus Devices (MQTT -> Modbus)""" + name: str + enabled: bool = Field(default=True) + subscribe_topic: str + slave_id: int + address: int + register_type: str + class MqttConfig(BaseModel): """MQTT Configuration""" broker: str port: int - publish_period: int class ModbusConfig(BaseModel): @@ -39,10 +48,18 @@ class ModbusConfig(BaseModel): gateway: str +class GlobalConfig(BaseModel): + """Global settings""" + scan_interval: int + log_level: str + + class Config(BaseModel): """Main Configuration""" + global_: GlobalConfig = Field(alias="global") mqtt: MqttConfig modbus: ModbusConfig + input: List[InputConfig] output: List[OutputConfig] @classmethod diff --git a/src/pv_controller/pvc.py b/src/pv_controller/pvc.py index 2454832..dd55480 100644 --- a/src/pv_controller/pvc.py +++ b/src/pv_controller/pvc.py @@ -5,11 +5,9 @@ from loguru import logger from config import Config import logging import threading +import sys -l = logging.getLogger() -for h in l.handlers: - l.removeHandler(h) deathBell = threading.Event() @@ -24,11 +22,16 @@ logger.info("pv controller starting") config = Config.load_from_file() +# configure loguru: only log INFO and above +logger.remove() +logger.add(sys.stdout, level=config.global_.log_level) + + modbusHandler = ModbusHandler(config) -# toDevicesThread = ToDevices(config, modbusHandler) -# toDevicesThread.start() -# logger.info("toDevices started") +toDevicesThread = ToDevices(config, modbusHandler) +toDevicesThread.start() +logger.info("toDevices started") fromDevicesThread = FromDevices(config, modbusHandler) fromDevicesThread.start() @@ -43,11 +46,11 @@ logger.info("pv controller is running") deathBell.wait() logger.error("pv controller is dying") -# toDevicesThread.stop() +toDevicesThread.stop() fromDevicesThread.stop() -# toDevicesThread.join() -# logger.error("toDevices joined") +toDevicesThread.join() +logger.error("toDevices joined") fromDevicesThread.join() logger.error("fromDevices joined")