should work so far

This commit is contained in:
2025-12-05 11:54:53 +01:00
parent 6faed5441c
commit 3f13a5adfa
7 changed files with 169 additions and 46 deletions

View File

@@ -1,7 +1,10 @@
global:
scan_interval: 1
log_level: INFO
mqtt:
broker: 172.16.2.16
port: 1883
publish_period: 15
modbus:
gateway: 172.16.2.42
@@ -23,8 +26,23 @@ modbus:
# { "slave":1, "addr":0x0001, "type":"holding", "attr": "state", "name":"State", "unit":"-", "adaptor": onOffAdaptor },
# ]
input:
- name: pv_control
subscribe_topic: IoT/PV/Control
slave_id: 1
address: 0
register_type: coil
- name: car_control
subscribe_topic: IoT/Car/Control
slave_id: 5
address: 0
register_type: coil
output:
- name: pv_meter
enabled: true
scan_rate: 15
publish_topic: IoT/PV/Values
slave_id: 2
registers:
@@ -120,7 +138,8 @@ output:
data_type: float32
adaptor: floatAdaptor
- name: pv_control
publish_topic: IoT/PV/Control
publish_topic: IoT/PV/Control/State
scan_rate: 1
slave_id: 1
registers:
- address: 0x0001
@@ -131,7 +150,9 @@ output:
data_type: int32
adaptor: onOffAdaptor
- name: car_control
publish_topic: IoT/Car/Control
enabled: true
publish_topic: IoT/Car/Control/State
scan_rate: 1
slave_id: 5
registers:
- address: 0x0001
@@ -144,26 +165,83 @@ output:
- name: car_meter
enabled: true
publish_topic: IoT/Car/Values
scan_rate: 15
slave_id: 6
registers:
- address: 14
- address: 0
attribute: voltageL1
name: Voltage L1
unit: V
register_type: holding
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 16
- address: 2
attribute: voltageL2
name: Voltage L2
unit: V
register_type: holding
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 18
- address: 4
attribute: voltageL3
name: Voltage L3
unit: V
register_type: holding
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 6
attribute: currentL1
name: Current L1
unit: A
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 8
attribute: currentL2
name: Current L2
unit: A
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 10
attribute: currentL3
name: Current L3
unit: A
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 12
attribute: powerL1
name: Power L1
unit: W
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 14
attribute: powerL2
name: Power L2
unit: W
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 16
attribute: powerL3
name: Power L3
unit: W
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 0x0048
attribute: totalImportEnergy
name: Total Import Energy
unit: kWh
register_type: input
data_type: float32
adaptor: floatAdaptor
- address: 0x004a
attribute: totalExportEnergy
name: Total Export Energy
unit: kWh
register_type: input
data_type: float32
adaptor: floatAdaptor

View File

@@ -9,7 +9,7 @@ def floatAdaptor(i):
return float(f"{i:0.2f}") if i else 0.0
def onOffAdaptor(i):
return i[0] if i else '-1'
return bool(i)
@@ -22,27 +22,39 @@ class FromDevices(AbstractMqttPublisher):
cnt = 0
while not self.killBill:
cnt += 1
payload = {}
try:
payload['status'] = "Error"
payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow())
for device in self.config.output:
for device in self.config.output:
try:
payload = {}
payload['status'] = "Error"
payload['timestamp'] = datetime.datetime.isoformat(datetime.datetime.utcnow())
logger.debug(f"{device.name=} {device.publish_topic=}")
if not device.enabled:
logger.debug(f" device disabled, skipping")
continue
if cnt % device.scan_rate != 0:
logger.debug(f" not scan_rate yet, skipping")
continue
for registers in device.registers:
logger.debug(f" {registers.name=} {registers.address=} {registers.register_type=}")
rawValue = self.modbusHandler.readRegister(registers.register_type, device.slave_id, registers.address, registers.data_type)
logger.debug(f" {rawValue=}")
payload['status'] = "Ok"
except Exception as e:
logger.error(f"Caught exception: {str(e)}")
if registers.adaptor == "floatAdaptor":
value = floatAdaptor(rawValue)
elif registers.adaptor == "onOffAdaptor":
value = onOffAdaptor(rawValue)
else:
value = rawValue
logger.debug(f" {value=}")
payload[registers.attribute] = value
# payload['cnt'] = cnt
# payloadStr = json.dumps(payload)
# self.client.publish(topic, payloadStr)
# logger.info(f"mqtt message sent: {topic} -> {payloadStr}")
payload['status'] = "Ok"
except Exception as e:
logger.error(f"Caught exception: {str(e)}")
self.killEvent.wait(timeout=float(self.config.mqtt.publish_period))
payload['cnt'] = cnt
payloadStr = json.dumps(payload)
self.client.publish(device.publish_topic, payloadStr)
logger.debug(f"mqtt message sent: {device.publish_topic} -> {payloadStr}")
self.killEvent.wait(timeout=float(self.config.global_.scan_interval))

View File

@@ -31,7 +31,9 @@ READ_REGISTER_FUNCTIONS = {
DATA_TYPES = {
'int16': ModbusTcpClient.DATATYPE.INT16,
'uint16': ModbusTcpClient.DATATYPE.UINT16,
'int32': ModbusTcpClient.DATATYPE.INT32,
'uint32': ModbusTcpClient.DATATYPE.UINT32,
'float32': ModbusTcpClient.DATATYPE.FLOAT32,
'float64': ModbusTcpClient.DATATYPE.FLOAT64
}
@@ -51,7 +53,7 @@ class ModbusHandler:
dataType = DATA_TYPES[data_type]
count = dataType.value[1]
logger.debug(f"{self.client=}, {addr=}, {count=}, {slave=}")
logger.debug(f"{addr=}, {count=}, {slave=}")
res = readFunc(self.client, addr, count=count, device_id=slave)
if (isinstance(res, pymodbus.pdu.register_message.ReadHoldingRegistersResponse) or
isinstance(res, pymodbus.pdu.register_message.ReadInputRegistersResponse) or
@@ -66,7 +68,7 @@ class ModbusHandler:
def writeCoil(self, slave, addr, value):
res = self.client.write_coil(addr, value, slave=slave)
res = self.client.write_coil(addr, value, device_id=slave)
logger.debug(f"write coil result {res}")
return value

View File

@@ -1,7 +1,7 @@
import paho.mqtt.client as mqtt
from loguru import logger
import uuid
import threading
import ssl
@@ -21,7 +21,9 @@ class AbstractMqttPublisher(threading.Thread):
self.config = config
self.client = mqtt.Client(userdata=self)
client_id = f"pv-controller-{uuid.uuid4()}"
logger.info(f"mqtt client id: {client_id}")
self.client = mqtt.Client(client_id=client_id, userdata=self)
# consider this flag in the localLoop
self.killBill = False

View File

@@ -13,17 +13,26 @@ class ToDevices(AbstractMqttPublisher):
sleep(60.0)
def onMessage(self, topic, payload):
logger.info("mqtt message received: {} -> {}".format(topic, str(payload)))
if payload == b'On':
self.modbusHandler.writeCoil(1, 0, 1)
elif payload == b'Off':
self.modbusHandler.writeCoil(1, 0, 0)
else:
logger.warning(f"Illegal command {payload} received")
try:
logger.debug("mqtt message received: {} -> {}".format(topic, str(payload)))
for device in self.config.input:
if topic != device.subscribe_topic:
continue
logger.debug(f"{topic=} matches {device.subscribe_topic=}, processing")
if not device.enabled:
logger.debug(f" device disabled, skipping")
continue
if device.register_type != 'coil':
raise Exception(f"Unsupported register type {device.register_type} for input device {device.name}")
value = payload == b'On'
self.modbusHandler.writeCoil(device.slave_id, device.address, value)
except Exception as e:
logger.error(f"Caught exception in onMessage: {str(e)}")
def onConnect(self):
logger.info("mqtt connected")
self.client.subscribe("{}".format(self.config["relaisSubscribeTopic"]))
for device in self.config.input:
self.client.subscribe(device.subscribe_topic)
logger.info(f"subscribed to topic: {device.subscribe_topic}")
logger.info("subscribed")

View File

@@ -22,16 +22,25 @@ class OutputConfig(BaseModel):
"""Output Configuration for Modbus Devices"""
name: str
enabled: bool = Field(default=True)
scan_rate: Optional[int] = Field(default=60)
publish_topic: str
slave_id: int
registers: List[RegisterConfig]
class InputConfig(BaseModel):
"""Input Configuration for Modbus Devices (MQTT -> Modbus)"""
name: str
enabled: bool = Field(default=True)
subscribe_topic: str
slave_id: int
address: int
register_type: str
class MqttConfig(BaseModel):
"""MQTT Configuration"""
broker: str
port: int
publish_period: int
class ModbusConfig(BaseModel):
@@ -39,10 +48,18 @@ class ModbusConfig(BaseModel):
gateway: str
class GlobalConfig(BaseModel):
"""Global settings"""
scan_interval: int
log_level: str
class Config(BaseModel):
"""Main Configuration"""
global_: GlobalConfig = Field(alias="global")
mqtt: MqttConfig
modbus: ModbusConfig
input: List[InputConfig]
output: List[OutputConfig]
@classmethod

View File

@@ -5,11 +5,9 @@ from loguru import logger
from config import Config
import logging
import threading
import sys
l = logging.getLogger()
for h in l.handlers:
l.removeHandler(h)
deathBell = threading.Event()
@@ -24,11 +22,16 @@ logger.info("pv controller starting")
config = Config.load_from_file()
# configure loguru: only log INFO and above
logger.remove()
logger.add(sys.stdout, level=config.global_.log_level)
modbusHandler = ModbusHandler(config)
# toDevicesThread = ToDevices(config, modbusHandler)
# toDevicesThread.start()
# logger.info("toDevices started")
toDevicesThread = ToDevices(config, modbusHandler)
toDevicesThread.start()
logger.info("toDevices started")
fromDevicesThread = FromDevices(config, modbusHandler)
fromDevicesThread.start()
@@ -43,11 +46,11 @@ logger.info("pv controller is running")
deathBell.wait()
logger.error("pv controller is dying")
# toDevicesThread.stop()
toDevicesThread.stop()
fromDevicesThread.stop()
# toDevicesThread.join()
# logger.error("toDevices joined")
toDevicesThread.join()
logger.error("toDevices joined")
fromDevicesThread.join()
logger.error("fromDevices joined")