From 78946ac4c70ea3373713b84c243499cc031956c3 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Wed, 3 Dec 2025 12:19:51 +0100 Subject: [PATCH] changes working so far --- config/config.yaml | 40 +++++----- src/pv_controller/FromDevices.py | 45 +++++++++++ src/pv_controller/MeterPublish.py | 63 --------------- src/pv_controller/ModbusBase.py | 80 ++++++++++--------- src/pv_controller/MqttBase.py | 17 +--- .../{RelaisSubscribe.py => ToDevices.py} | 2 +- src/pv_controller/config.py | 2 +- src/pv_controller/pvc.py | 31 ++++--- .../pv_controller/requirements.txt | 0 9 files changed, 124 insertions(+), 156 deletions(-) create mode 100644 src/pv_controller/FromDevices.py delete mode 100644 src/pv_controller/MeterPublish.py rename src/pv_controller/{RelaisSubscribe.py => ToDevices.py} (94%) rename requirements.txt => src/pv_controller/requirements.txt (100%) diff --git a/config/config.yaml b/config/config.yaml index d643cf6..9980cea 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,10 +1,10 @@ mqtt: - broker: ${MQTT__BROKER} - port: ${MQTT__PORT} + broker: 172.16.2.16 + port: 1883 + publish_period: 15 modbus: - gateway: ${MODBUS__GATEWAY} - + gateway: 172.16.2.42 # REGISTERS = [ # { "slave":2, "addr":0x0048, "type":"input", "attr": "importEnergyActive", "name":"Import active energy", "unit":"kWh", "adaptor": floatAdaptor }, @@ -26,7 +26,6 @@ modbus: output: - name: pv_meter publish_topic: IoT/PV/Values - publish_period: 15 slave_id: 2 registers: - address: 0x0048 @@ -34,95 +33,94 @@ output: name: Import active energy unit: kWh register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x004c attribute: importEnergyReactive name: Import reactive energy unit: kVAh register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x004a attribute: exportEnergyActive name: Export active energy unit: kWh register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x004e attribute: exportEnergyReactive name: Export reactive energy unit: kVAh register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x0012 attribute: powerApparent name: Apparent Power unit: W register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x000c attribute: powerActive name: Active Power unit: W register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x0018 attribute: powerReactive name: Reactive Power unit: W register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x0058 attribute: powerDemandPositive name: PositivePowerDemand unit: W register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x005c attribute: powerDemandReverse name: ReversePowerDemand unit: W register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x001e attribute: factor name: Factor unit: "-" register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x0024 attribute: angle name: Angle unit: degree register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x0000 attribute: voltage name: Voltage unit: V register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - address: 0x0006 attribute: current name: Current unit: A register_type: input - data_type: float + data_type: float32 adaptor: floatAdaptor - name: pv_control publish_topic: IoT/PV/Control - publish_period: 15 slave_id: 1 registers: - address: 0x0001 @@ -130,5 +128,5 @@ output: name: State unit: "-" register_type: holding - data_type: int - adaptor: onOffAdaptor + data_type: int32 + adaptor: onOffAdaptor \ No newline at end of file diff --git a/src/pv_controller/FromDevices.py b/src/pv_controller/FromDevices.py new file mode 100644 index 0000000..d1acb5a --- /dev/null +++ b/src/pv_controller/FromDevices.py @@ -0,0 +1,45 @@ +from threading import Event +from loguru import logger +from MqttBase import AbstractMqttPublisher +import json +import datetime + + +def floatAdaptor(i): + return float(f"{i:0.2f}") if i else 0.0 + +def onOffAdaptor(i): + return i[0] if i else '-1' + + + +class FromDevices(AbstractMqttPublisher): + def __init__(self, config, modbusHandler): + super().__init__(config) + self.modbusHandler = modbusHandler + + def localLoop(self): + cnt = 0 + while not self.killBill: + cnt += 1 + payload = {} + + try: + payload['status'] = "Error" + payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow()) + for device in self.config.output: + logger.debug(f"{device.name=} {device.publish_topic=}") + for registers in device.registers: + logger.debug(f" {registers.name=} {registers.address=} {registers.register_type=}") + rawValue = self.modbusHandler.readRegister(registers.register_type, device.slave_id, registers.address, registers.data_type) + logger.debug(f" {rawValue=}") + payload['status'] = "Ok" + except Exception as e: + logger.error(f"Caught exception: {str(e)}") + +# payload['cnt'] = cnt +# payloadStr = json.dumps(payload) +# self.client.publish(topic, payloadStr) +# logger.info(f"mqtt message sent: {topic} -> {payloadStr}") + + self.killEvent.wait(timeout=float(self.config.mqtt.publish_period)) diff --git a/src/pv_controller/MeterPublish.py b/src/pv_controller/MeterPublish.py deleted file mode 100644 index edd631a..0000000 --- a/src/pv_controller/MeterPublish.py +++ /dev/null @@ -1,63 +0,0 @@ -from threading import Event -from loguru import logger -from MqttBase import AbstractMqttPublisher -import json -import datetime - - -def floatAdaptor(i): - return float(f"{i:0.2f}") if i else 0.0 - -def onOffAdaptor(i): - return i[0] if i else '-1' - - -REGISTERS = [ - { "slave":2, "addr":0x0048, "type":"input", "attr": "importEnergyActive", "name":"Import active energy", "unit":"kWh", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x004c, "type":"input", "attr": "importEnergyReactive", "name":"Import reactive energy", "unit":"kVAh", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x004a, "type":"input", "attr": "exportEnergyActive", "name":"Export active energy", "unit":"kWh", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x004e, "type":"input", "attr": "exportEnergyReactive", "name":"Export reactive energy", "unit":"kVAh", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x0012, "type":"input", "attr": "powerApparent", "name":"Apparent Power", "unit":"W", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x000c, "type":"input", "attr": "powerActive", "name":"Active Power", "unit":"W", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x0018, "type":"input", "attr": "powerReactive", "name":"Reactive Power", "unit":"W", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x0058, "type":"input", "attr": "powerDemandPositive", "name":"PositivePowerDemand", "unit":"W", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x005c, "type":"input", "attr": "powerDemandReverse", "name":"ReversePowerDemand", "unit":"W", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x001e, "type":"input", "attr": "factor", "name":"Factor", "unit":"-", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x0024, "type":"input", "attr": "angle", "name":"Angle", "unit":"degree", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x0000, "type":"input", "attr": "voltage", "name":"Voltage", "unit":"V", "adaptor": floatAdaptor }, - { "slave":2, "addr":0x0006, "type":"input", "attr": "current", "name":"Current", "unit":"A", "adaptor": floatAdaptor }, - { "slave":1, "addr":0x0001, "type":"holding", "attr": "state", "name":"State", "unit":"-", "adaptor": onOffAdaptor }, -] - - -class MeterPublish(AbstractMqttPublisher): - def __init__(self, config, modbusHandler): - super().__init__(config) - self.modbusHandler = modbusHandler - self.registers = REGISTERS - - def localLoop(self): - cnt = 0 - while not self.killBill: - cnt += 1 - topic = self.config["meterPublishTopic"] - payload = str(cnt) - - try: - payload = { r['attr']: r['adaptor'](None) for r in self.registers } - payload['status'] = "Error" - payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow()) - for reg in self.registers: - v = self.modbusHandler.readRegister(reg['type'], reg['slave'], reg['addr']) - logger.debug(f"{reg['name']}: {v} {reg['unit']}") - payload[reg['attr']] = reg['adaptor'](v) - payload['status'] = "Ok" - except Exception as e: - logger.error(f"Caught exception: {str(e)}") - - payload['cnt'] = cnt - payloadStr = json.dumps(payload) - self.client.publish(topic, payloadStr) - logger.info(f"mqtt message sent: {topic} -> {payloadStr}") - - self.killEvent.wait(timeout=float(self.config["meterPublishPeriod"])) diff --git a/src/pv_controller/ModbusBase.py b/src/pv_controller/ModbusBase.py index c06ae2a..93e22ce 100644 --- a/src/pv_controller/ModbusBase.py +++ b/src/pv_controller/ModbusBase.py @@ -1,8 +1,10 @@ +from enum import IntEnum +import pymodbus from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusIOException -from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse -from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder -from pymodbus.constants import Endian +# from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse +# from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder +# from pymodbus.constants import Endian from loguru import logger import sys @@ -19,48 +21,48 @@ class LocalModbusException(Exception): def __str__(self): return f"LocalModbusException: Msg:{self.msg}, Cause:{self.cause}" + +READ_REGISTER_FUNCTIONS = { + 'coils': ModbusTcpClient.read_coils, + 'discrete': ModbusTcpClient.read_discrete_inputs, + 'holding': ModbusTcpClient.read_holding_registers, + 'input': ModbusTcpClient.read_input_registers +} + +DATA_TYPES = { + 'int16': ModbusTcpClient.DATATYPE.INT16, + 'int32': ModbusTcpClient.DATATYPE.INT32, + 'float32': ModbusTcpClient.DATATYPE.FLOAT32, + 'float64': ModbusTcpClient.DATATYPE.FLOAT64 +} + + + + class ModbusHandler: def __init__(self, config): - self.config = config['modbus'] - self.client = ModbusTcpClient(self.config['gateway']) + self.config = config.modbus + self.client = ModbusTcpClient(self.config.gateway) self.client.connect() - - def readRegister(self, typ, slave, addr): - if typ == 'input': - return self.readInputRegister(slave, addr) - elif typ == 'holding': - return self.readHoldingRegister(slave, addr) - else: - raise LocalModbusException('unsupported read type') + def readRegister(self, typ, slave, addr, data_type): + try: + readFunc = READ_REGISTER_FUNCTIONS[typ] + dataType = DATA_TYPES[data_type] + count = dataType.value[1] - def readInputRegister(self, slave, addr): - res = self.client.read_input_registers(addr, 2, slave=slave) - if (isinstance(res, ReadInputRegistersResponse)): - v = BinaryPayloadDecoder.fromRegisters(res.registers, byteorder=Endian.BIG, wordorder=Endian.BIG).decode_32bit_float() - return v - elif (isinstance(res, LocalModbusException)): - msg = f"Error: {type(res)}, Content: {res}" - logger.warning(msg) - raise LocalModbusException(msg=msg, cause=res) - else: - msg = f"Unknown type: {type(res)}, Content: {res}" - logger.warning(msg) - raise LocalModbusException(msg=msg) + res = readFunc(self.client, addr, count=count, device_id=slave) + if (isinstance(res, pymodbus.pdu.register_message.ReadHoldingRegistersResponse) or + isinstance(res, pymodbus.pdu.register_message.ReadInputRegistersResponse) or + isinstance(res, pymodbus.pdu.bits_message.ReadCoilsResponse) or + isinstance(res, pymodbus.pdu.bits_message.ReadDiscreteInputsResponse)): + v = self.client.convert_from_registers(res.registers, data_type=dataType) + return v + else: + raise LocalModbusException(f"Read register failed: slave={slave}, addr={addr}, type={typ}, data_type={data_type}, response={res}") + except Exception as e: + raise LocalModbusException(f"Exception during read register: slave={slave}, addr={addr}, type={typ}, data_type={data_type}", cause=e) - def readHoldingRegister(self, slave, addr): - res = self.client.read_holding_registers(addr, 2, slave=slave) - if (isinstance(res, ReadHoldingRegistersResponse)): - v = res.registers - return v - elif (isinstance(res, LocalModbusException)): - msg = f"Error: {type(res)}, Content: {res}" - logger.warning(msg) - raise LocalModbusException(msg=msg, cause=res) - else: - msg = f"Unknown type: {type(res)}, Content: {res}" - logger.warning(msg) - raise LocalModbusException(msg=msg) def writeCoil(self, slave, addr, value): res = self.client.write_coil(addr, value, slave=slave) diff --git a/src/pv_controller/MqttBase.py b/src/pv_controller/MqttBase.py index 6033846..9028a19 100644 --- a/src/pv_controller/MqttBase.py +++ b/src/pv_controller/MqttBase.py @@ -19,7 +19,7 @@ class AbstractMqttPublisher(threading.Thread): def __init__(self, config): super().__init__() - self.config = config["mqtt"] + self.config = config self.client = mqtt.Client(userdata=self) @@ -32,20 +32,7 @@ class AbstractMqttPublisher(threading.Thread): self.client.on_connect = mqttOnConnectCallback self.client.on_disconnect = mqttOnDisconnectCallback - if ("login" in self.config) and ("password" in self.config): - self.client.username_pw_set(self.config["login"], self.config["password"]) - - if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config): - self.client.tls_set( - ca_certs=self.config["ca"], - certfile=self.config["cert"], - keyfile=self.config["key"], - cert_reqs=ssl.CERT_REQUIRED, - tls_version=ssl.PROTOCOL_TLSv1_2, - ciphers=None # this does not mean "no cipher" but it means "default ciphers" - ) - - self.client.connect(self.config["broker"], int(self.config["port"])) + self.client.connect(self.config.mqtt.broker, int(self.config.mqtt.port)) self.client.loop_start() logger.info("mqtt loop started") diff --git a/src/pv_controller/RelaisSubscribe.py b/src/pv_controller/ToDevices.py similarity index 94% rename from src/pv_controller/RelaisSubscribe.py rename to src/pv_controller/ToDevices.py index 69acf6d..980cd8c 100644 --- a/src/pv_controller/RelaisSubscribe.py +++ b/src/pv_controller/ToDevices.py @@ -3,7 +3,7 @@ from loguru import logger from time import sleep -class RelaisSubscribe(AbstractMqttPublisher): +class ToDevices(AbstractMqttPublisher): def __init__(self, config, modbusHandler): super().__init__(config) self.modbusHandler = modbusHandler diff --git a/src/pv_controller/config.py b/src/pv_controller/config.py index 5dc4f9c..225616b 100644 --- a/src/pv_controller/config.py +++ b/src/pv_controller/config.py @@ -22,7 +22,6 @@ class OutputConfig(BaseModel): """Output Configuration for Modbus Devices""" name: str publish_topic: str - publish_period: int slave_id: int registers: List[RegisterConfig] @@ -31,6 +30,7 @@ class MqttConfig(BaseModel): """MQTT Configuration""" broker: str port: int + publish_period: int class ModbusConfig(BaseModel): diff --git a/src/pv_controller/pvc.py b/src/pv_controller/pvc.py index fb6b41d..2454832 100644 --- a/src/pv_controller/pvc.py +++ b/src/pv_controller/pvc.py @@ -1,5 +1,5 @@ -from MeterPublish import MeterPublish -from RelaisSubscribe import RelaisSubscribe +from FromDevices import FromDevices +from ToDevices import ToDevices from ModbusBase import ModbusHandler from loguru import logger from config import Config @@ -22,17 +22,17 @@ def exceptHook(args): logger.info("pv controller starting") -config = Config() +config = Config.load_from_file() modbusHandler = ModbusHandler(config) -relaisSubscribeThread = RelaisSubscribe(config, modbusHandler) -relaisSubscribeThread.start() -logger.info("relaisSubscribe started") +# toDevicesThread = ToDevices(config, modbusHandler) +# toDevicesThread.start() +# logger.info("toDevices started") -meterPublishThread = MeterPublish(config, modbusHandler) -meterPublishThread.start() -logger.info("meterPublishThread started") +fromDevicesThread = FromDevices(config, modbusHandler) +fromDevicesThread.start() +logger.info("fromDevices started") threading.excepthook = exceptHook logger.info("Threading excepthook set") @@ -43,13 +43,12 @@ logger.info("pv controller is running") deathBell.wait() logger.error("pv controller is dying") -relaisSubscribeThread.stop() -meterPublishThread.stop() +# toDevicesThread.stop() +fromDevicesThread.stop() -relaisSubscribeThread.join() -logger.error("relaisSubscribe joined") - -meterPublishThread.join() -logger.error("meterPublishThread joined") +# toDevicesThread.join() +# logger.error("toDevices joined") +fromDevicesThread.join() +logger.error("fromDevices joined") logger.error("pv controller is terminated") diff --git a/requirements.txt b/src/pv_controller/requirements.txt similarity index 100% rename from requirements.txt rename to src/pv_controller/requirements.txt