changes working so far

This commit is contained in:
2025-12-03 12:19:51 +01:00
parent 0f0da63a8f
commit 78946ac4c7
9 changed files with 124 additions and 156 deletions

View File

@@ -1,10 +1,10 @@
mqtt: mqtt:
broker: ${MQTT__BROKER} broker: 172.16.2.16
port: ${MQTT__PORT} port: 1883
publish_period: 15
modbus: modbus:
gateway: ${MODBUS__GATEWAY} gateway: 172.16.2.42
# REGISTERS = [ # REGISTERS = [
# { "slave":2, "addr":0x0048, "type":"input", "attr": "importEnergyActive", "name":"Import active energy", "unit":"kWh", "adaptor": floatAdaptor }, # { "slave":2, "addr":0x0048, "type":"input", "attr": "importEnergyActive", "name":"Import active energy", "unit":"kWh", "adaptor": floatAdaptor },
@@ -26,7 +26,6 @@ modbus:
output: output:
- name: pv_meter - name: pv_meter
publish_topic: IoT/PV/Values publish_topic: IoT/PV/Values
publish_period: 15
slave_id: 2 slave_id: 2
registers: registers:
- address: 0x0048 - address: 0x0048
@@ -34,95 +33,94 @@ output:
name: Import active energy name: Import active energy
unit: kWh unit: kWh
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x004c - address: 0x004c
attribute: importEnergyReactive attribute: importEnergyReactive
name: Import reactive energy name: Import reactive energy
unit: kVAh unit: kVAh
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x004a - address: 0x004a
attribute: exportEnergyActive attribute: exportEnergyActive
name: Export active energy name: Export active energy
unit: kWh unit: kWh
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x004e - address: 0x004e
attribute: exportEnergyReactive attribute: exportEnergyReactive
name: Export reactive energy name: Export reactive energy
unit: kVAh unit: kVAh
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x0012 - address: 0x0012
attribute: powerApparent attribute: powerApparent
name: Apparent Power name: Apparent Power
unit: W unit: W
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x000c - address: 0x000c
attribute: powerActive attribute: powerActive
name: Active Power name: Active Power
unit: W unit: W
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x0018 - address: 0x0018
attribute: powerReactive attribute: powerReactive
name: Reactive Power name: Reactive Power
unit: W unit: W
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x0058 - address: 0x0058
attribute: powerDemandPositive attribute: powerDemandPositive
name: PositivePowerDemand name: PositivePowerDemand
unit: W unit: W
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x005c - address: 0x005c
attribute: powerDemandReverse attribute: powerDemandReverse
name: ReversePowerDemand name: ReversePowerDemand
unit: W unit: W
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x001e - address: 0x001e
attribute: factor attribute: factor
name: Factor name: Factor
unit: "-" unit: "-"
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x0024 - address: 0x0024
attribute: angle attribute: angle
name: Angle name: Angle
unit: degree unit: degree
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x0000 - address: 0x0000
attribute: voltage attribute: voltage
name: Voltage name: Voltage
unit: V unit: V
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- address: 0x0006 - address: 0x0006
attribute: current attribute: current
name: Current name: Current
unit: A unit: A
register_type: input register_type: input
data_type: float data_type: float32
adaptor: floatAdaptor adaptor: floatAdaptor
- name: pv_control - name: pv_control
publish_topic: IoT/PV/Control publish_topic: IoT/PV/Control
publish_period: 15
slave_id: 1 slave_id: 1
registers: registers:
- address: 0x0001 - address: 0x0001
@@ -130,5 +128,5 @@ output:
name: State name: State
unit: "-" unit: "-"
register_type: holding register_type: holding
data_type: int data_type: int32
adaptor: onOffAdaptor adaptor: onOffAdaptor

View File

@@ -0,0 +1,45 @@
from threading import Event
from loguru import logger
from MqttBase import AbstractMqttPublisher
import json
import datetime
def floatAdaptor(i):
return float(f"{i:0.2f}") if i else 0.0
def onOffAdaptor(i):
return i[0] if i else '-1'
class FromDevices(AbstractMqttPublisher):
def __init__(self, config, modbusHandler):
super().__init__(config)
self.modbusHandler = modbusHandler
def localLoop(self):
cnt = 0
while not self.killBill:
cnt += 1
payload = {}
try:
payload['status'] = "Error"
payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow())
for device in self.config.output:
logger.debug(f"{device.name=} {device.publish_topic=}")
for registers in device.registers:
logger.debug(f" {registers.name=} {registers.address=} {registers.register_type=}")
rawValue = self.modbusHandler.readRegister(registers.register_type, device.slave_id, registers.address, registers.data_type)
logger.debug(f" {rawValue=}")
payload['status'] = "Ok"
except Exception as e:
logger.error(f"Caught exception: {str(e)}")
# payload['cnt'] = cnt
# payloadStr = json.dumps(payload)
# self.client.publish(topic, payloadStr)
# logger.info(f"mqtt message sent: {topic} -> {payloadStr}")
self.killEvent.wait(timeout=float(self.config.mqtt.publish_period))

View File

@@ -1,63 +0,0 @@
from threading import Event
from loguru import logger
from MqttBase import AbstractMqttPublisher
import json
import datetime
def floatAdaptor(i):
return float(f"{i:0.2f}") if i else 0.0
def onOffAdaptor(i):
return i[0] if i else '-1'
REGISTERS = [
{ "slave":2, "addr":0x0048, "type":"input", "attr": "importEnergyActive", "name":"Import active energy", "unit":"kWh", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x004c, "type":"input", "attr": "importEnergyReactive", "name":"Import reactive energy", "unit":"kVAh", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x004a, "type":"input", "attr": "exportEnergyActive", "name":"Export active energy", "unit":"kWh", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x004e, "type":"input", "attr": "exportEnergyReactive", "name":"Export reactive energy", "unit":"kVAh", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x0012, "type":"input", "attr": "powerApparent", "name":"Apparent Power", "unit":"W", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x000c, "type":"input", "attr": "powerActive", "name":"Active Power", "unit":"W", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x0018, "type":"input", "attr": "powerReactive", "name":"Reactive Power", "unit":"W", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x0058, "type":"input", "attr": "powerDemandPositive", "name":"PositivePowerDemand", "unit":"W", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x005c, "type":"input", "attr": "powerDemandReverse", "name":"ReversePowerDemand", "unit":"W", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x001e, "type":"input", "attr": "factor", "name":"Factor", "unit":"-", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x0024, "type":"input", "attr": "angle", "name":"Angle", "unit":"degree", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x0000, "type":"input", "attr": "voltage", "name":"Voltage", "unit":"V", "adaptor": floatAdaptor },
{ "slave":2, "addr":0x0006, "type":"input", "attr": "current", "name":"Current", "unit":"A", "adaptor": floatAdaptor },
{ "slave":1, "addr":0x0001, "type":"holding", "attr": "state", "name":"State", "unit":"-", "adaptor": onOffAdaptor },
]
class MeterPublish(AbstractMqttPublisher):
def __init__(self, config, modbusHandler):
super().__init__(config)
self.modbusHandler = modbusHandler
self.registers = REGISTERS
def localLoop(self):
cnt = 0
while not self.killBill:
cnt += 1
topic = self.config["meterPublishTopic"]
payload = str(cnt)
try:
payload = { r['attr']: r['adaptor'](None) for r in self.registers }
payload['status'] = "Error"
payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow())
for reg in self.registers:
v = self.modbusHandler.readRegister(reg['type'], reg['slave'], reg['addr'])
logger.debug(f"{reg['name']}: {v} {reg['unit']}")
payload[reg['attr']] = reg['adaptor'](v)
payload['status'] = "Ok"
except Exception as e:
logger.error(f"Caught exception: {str(e)}")
payload['cnt'] = cnt
payloadStr = json.dumps(payload)
self.client.publish(topic, payloadStr)
logger.info(f"mqtt message sent: {topic} -> {payloadStr}")
self.killEvent.wait(timeout=float(self.config["meterPublishPeriod"]))

View File

@@ -1,8 +1,10 @@
from enum import IntEnum
import pymodbus
from pymodbus.client import ModbusTcpClient from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException from pymodbus.exceptions import ModbusIOException
from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse # from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder # from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
from pymodbus.constants import Endian # from pymodbus.constants import Endian
from loguru import logger from loguru import logger
import sys import sys
@@ -19,48 +21,48 @@ class LocalModbusException(Exception):
def __str__(self): def __str__(self):
return f"LocalModbusException: Msg:{self.msg}, Cause:{self.cause}" return f"LocalModbusException: Msg:{self.msg}, Cause:{self.cause}"
READ_REGISTER_FUNCTIONS = {
'coils': ModbusTcpClient.read_coils,
'discrete': ModbusTcpClient.read_discrete_inputs,
'holding': ModbusTcpClient.read_holding_registers,
'input': ModbusTcpClient.read_input_registers
}
DATA_TYPES = {
'int16': ModbusTcpClient.DATATYPE.INT16,
'int32': ModbusTcpClient.DATATYPE.INT32,
'float32': ModbusTcpClient.DATATYPE.FLOAT32,
'float64': ModbusTcpClient.DATATYPE.FLOAT64
}
class ModbusHandler: class ModbusHandler:
def __init__(self, config): def __init__(self, config):
self.config = config['modbus'] self.config = config.modbus
self.client = ModbusTcpClient(self.config['gateway']) self.client = ModbusTcpClient(self.config.gateway)
self.client.connect() self.client.connect()
def readRegister(self, typ, slave, addr): def readRegister(self, typ, slave, addr, data_type):
if typ == 'input': try:
return self.readInputRegister(slave, addr) readFunc = READ_REGISTER_FUNCTIONS[typ]
elif typ == 'holding': dataType = DATA_TYPES[data_type]
return self.readHoldingRegister(slave, addr) count = dataType.value[1]
else:
raise LocalModbusException('unsupported read type')
def readInputRegister(self, slave, addr): res = readFunc(self.client, addr, count=count, device_id=slave)
res = self.client.read_input_registers(addr, 2, slave=slave) if (isinstance(res, pymodbus.pdu.register_message.ReadHoldingRegistersResponse) or
if (isinstance(res, ReadInputRegistersResponse)): isinstance(res, pymodbus.pdu.register_message.ReadInputRegistersResponse) or
v = BinaryPayloadDecoder.fromRegisters(res.registers, byteorder=Endian.BIG, wordorder=Endian.BIG).decode_32bit_float() isinstance(res, pymodbus.pdu.bits_message.ReadCoilsResponse) or
return v isinstance(res, pymodbus.pdu.bits_message.ReadDiscreteInputsResponse)):
elif (isinstance(res, LocalModbusException)): v = self.client.convert_from_registers(res.registers, data_type=dataType)
msg = f"Error: {type(res)}, Content: {res}" return v
logger.warning(msg) else:
raise LocalModbusException(msg=msg, cause=res) raise LocalModbusException(f"Read register failed: slave={slave}, addr={addr}, type={typ}, data_type={data_type}, response={res}")
else: except Exception as e:
msg = f"Unknown type: {type(res)}, Content: {res}" raise LocalModbusException(f"Exception during read register: slave={slave}, addr={addr}, type={typ}, data_type={data_type}", cause=e)
logger.warning(msg)
raise LocalModbusException(msg=msg)
def readHoldingRegister(self, slave, addr):
res = self.client.read_holding_registers(addr, 2, slave=slave)
if (isinstance(res, ReadHoldingRegistersResponse)):
v = res.registers
return v
elif (isinstance(res, LocalModbusException)):
msg = f"Error: {type(res)}, Content: {res}"
logger.warning(msg)
raise LocalModbusException(msg=msg, cause=res)
else:
msg = f"Unknown type: {type(res)}, Content: {res}"
logger.warning(msg)
raise LocalModbusException(msg=msg)
def writeCoil(self, slave, addr, value): def writeCoil(self, slave, addr, value):
res = self.client.write_coil(addr, value, slave=slave) res = self.client.write_coil(addr, value, slave=slave)

View File

@@ -19,7 +19,7 @@ class AbstractMqttPublisher(threading.Thread):
def __init__(self, config): def __init__(self, config):
super().__init__() super().__init__()
self.config = config["mqtt"] self.config = config
self.client = mqtt.Client(userdata=self) self.client = mqtt.Client(userdata=self)
@@ -32,20 +32,7 @@ class AbstractMqttPublisher(threading.Thread):
self.client.on_connect = mqttOnConnectCallback self.client.on_connect = mqttOnConnectCallback
self.client.on_disconnect = mqttOnDisconnectCallback self.client.on_disconnect = mqttOnDisconnectCallback
if ("login" in self.config) and ("password" in self.config): self.client.connect(self.config.mqtt.broker, int(self.config.mqtt.port))
self.client.username_pw_set(self.config["login"], self.config["password"])
if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config):
self.client.tls_set(
ca_certs=self.config["ca"],
certfile=self.config["cert"],
keyfile=self.config["key"],
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None # this does not mean "no cipher" but it means "default ciphers"
)
self.client.connect(self.config["broker"], int(self.config["port"]))
self.client.loop_start() self.client.loop_start()
logger.info("mqtt loop started") logger.info("mqtt loop started")

View File

@@ -3,7 +3,7 @@ from loguru import logger
from time import sleep from time import sleep
class RelaisSubscribe(AbstractMqttPublisher): class ToDevices(AbstractMqttPublisher):
def __init__(self, config, modbusHandler): def __init__(self, config, modbusHandler):
super().__init__(config) super().__init__(config)
self.modbusHandler = modbusHandler self.modbusHandler = modbusHandler

View File

@@ -22,7 +22,6 @@ class OutputConfig(BaseModel):
"""Output Configuration for Modbus Devices""" """Output Configuration for Modbus Devices"""
name: str name: str
publish_topic: str publish_topic: str
publish_period: int
slave_id: int slave_id: int
registers: List[RegisterConfig] registers: List[RegisterConfig]
@@ -31,6 +30,7 @@ class MqttConfig(BaseModel):
"""MQTT Configuration""" """MQTT Configuration"""
broker: str broker: str
port: int port: int
publish_period: int
class ModbusConfig(BaseModel): class ModbusConfig(BaseModel):

View File

@@ -1,5 +1,5 @@
from MeterPublish import MeterPublish from FromDevices import FromDevices
from RelaisSubscribe import RelaisSubscribe from ToDevices import ToDevices
from ModbusBase import ModbusHandler from ModbusBase import ModbusHandler
from loguru import logger from loguru import logger
from config import Config from config import Config
@@ -22,17 +22,17 @@ def exceptHook(args):
logger.info("pv controller starting") logger.info("pv controller starting")
config = Config() config = Config.load_from_file()
modbusHandler = ModbusHandler(config) modbusHandler = ModbusHandler(config)
relaisSubscribeThread = RelaisSubscribe(config, modbusHandler) # toDevicesThread = ToDevices(config, modbusHandler)
relaisSubscribeThread.start() # toDevicesThread.start()
logger.info("relaisSubscribe started") # logger.info("toDevices started")
meterPublishThread = MeterPublish(config, modbusHandler) fromDevicesThread = FromDevices(config, modbusHandler)
meterPublishThread.start() fromDevicesThread.start()
logger.info("meterPublishThread started") logger.info("fromDevices started")
threading.excepthook = exceptHook threading.excepthook = exceptHook
logger.info("Threading excepthook set") logger.info("Threading excepthook set")
@@ -43,13 +43,12 @@ logger.info("pv controller is running")
deathBell.wait() deathBell.wait()
logger.error("pv controller is dying") logger.error("pv controller is dying")
relaisSubscribeThread.stop() # toDevicesThread.stop()
meterPublishThread.stop() fromDevicesThread.stop()
relaisSubscribeThread.join() # toDevicesThread.join()
logger.error("relaisSubscribe joined") # logger.error("toDevices joined")
fromDevicesThread.join()
meterPublishThread.join() logger.error("fromDevices joined")
logger.error("meterPublishThread joined")
logger.error("pv controller is terminated") logger.error("pv controller is terminated")