From 17e5b6b012f2790c7bb9be63351605c25760842e Mon Sep 17 00:00:00 2001 From: Wolfgang Ludger Hottgenroth Date: Mon, 23 Aug 2021 16:24:40 +0200 Subject: [PATCH] input working --- src/AbstractMqttHandler.py | 50 ++++++++++++++++++++++++++++++++++++++ src/ModbusHandler.py | 21 ++++++++++------ src/MqttEventPublisher.py | 41 +++++++++++++++++++++++++++++++ src/MqttPeriodPublisher.py | 32 ++++++++++++++++++++++++ src/ProcessImage.py | 3 --- src/digitaltwin1.py | 37 +++++++++++++++++++++++----- 6 files changed, 167 insertions(+), 17 deletions(-) create mode 100644 src/AbstractMqttHandler.py create mode 100644 src/MqttEventPublisher.py create mode 100644 src/MqttPeriodPublisher.py diff --git a/src/AbstractMqttHandler.py b/src/AbstractMqttHandler.py new file mode 100644 index 0000000..4f3a6e0 --- /dev/null +++ b/src/AbstractMqttHandler.py @@ -0,0 +1,50 @@ +import paho.mqtt.client as mqtt +import threading +from loguru import logger + + +def mqttOnConnectCallback(client, userdata, flags, rc): + userdata.onConnect() + +def mqttOnMessageCallback(client, userdata, message): + userdata.onMessage(message.topic, message.payload) + +def mqttOnDisconnectCallback(client, userdata, rc): + userdata.onDisconnect(rc) + + +class AbstractMqttPublisher(threading.Thread): + def __init__(self, config, processImage): + super().__init__() + + self.config = config["mqtt"] + self.processImage = processImage + + self.client = mqtt.Client(userdata=self) + + def run(self): + self.client.on_message = mqttOnMessageCallback + self.client.on_connect = mqttOnConnectCallback + self.client.on_disconnect = mqttOnDisconnectCallback + + if ("login" in self.config) and ("password" in self.config): + self.client.username_pw_set(self.config["login"], self.config["password"]) + + self.client.connect(self.config["broker"]) + self.client.loop_start() + logger.info("mqtt loop started") + + self.localLoop() + + def localLoop(self): + raise NotImplementedError() + + def onConnect(self): + logger.info("mqtt connected") + + def onDisconnect(self, rc): + logger.warning("mqtt disconnect, rc: {}".format(rc)) + + def onMessage(self, topic, payload): + logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload))) + diff --git a/src/ModbusHandler.py b/src/ModbusHandler.py index 86cba00..692a29d 100644 --- a/src/ModbusHandler.py +++ b/src/ModbusHandler.py @@ -2,15 +2,20 @@ from pymodbus.client.sync import ModbusTcpClient as ModbusClient from pymodbus.exceptions import ModbusIOException from time import sleep import threading +from loguru import logger -MODBUS_CLIENT = '172.16.2.157' -MODBUS_REFRESH_PERIOD = 0.25 +def modbusStart(config, processImage): + modbusThread = threading.Thread(target=modbusHandler, args=[config, processImage]) + modbusThread.start() +def modbusHandler(config, processImage): + modbusClient = config["modbus"]["client"] + modbusRefreshPeriod = config["modbus"]["scanrate"] -def modbusHandler(processImage): - client = ModbusClient(MODBUS_CLIENT) + + client = ModbusClient(modbusClient) try: client.connect() @@ -23,7 +28,7 @@ def modbusHandler(processImage): raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers))) (analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers - print(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}") + logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}") processImage.init(digitalOutputBits, digitalInputBits, analogInputBits) @@ -63,12 +68,12 @@ def modbusHandler(processImage): raise Exception(reg) except Exception as e: - print("Exception in inner modbus handler loop: {}".format(e)) + logger.error("Exception in inner modbus handler loop: {}".format(e)) client.close() finally: - sleep(MODBUS_REFRESH_PERIOD) + sleep(modbusRefreshPeriod) except Exception as e: - print("Exception in modbus handler: {}".format(e)) + logger.error("Exception in modbus handler: {}".format(e)) finally: client.close() diff --git a/src/MqttEventPublisher.py b/src/MqttEventPublisher.py new file mode 100644 index 0000000..84300f9 --- /dev/null +++ b/src/MqttEventPublisher.py @@ -0,0 +1,41 @@ +import paho.mqtt.client as mqtt +import threading +from loguru import logger +from AbstractMqttHandler import AbstractMqttPublisher + + +def mqttEventPublisherStart(config, processImage): + mqttEventPublisherThread = MqttEventPublisher(config, processImage) + mqttEventPublisherThread.start() + + +class MqttEventPublisher(AbstractMqttPublisher): + def __init__(self, config, processImage): + super().__init__(config, processImage) + + def localLoop(self): + while True: + with self.processImage: + self.processImage.wait() + + discreteInputChangeset = self.processImage.getChangedDiscreteInputs() + analogInputChangeset = self.processImage.getChangedAnalogsInputs() + + for discreteInputChangeItem in discreteInputChangeset: + logger.debug("Discrete input {} changed from {} to {}" + .format(discreteInputChangeItem[0], + discreteInputChangeItem[1][1], + discreteInputChangeItem[1][0])) + self.client.publish("{}/{}".format(self.config["digitalInputTopicPrefix"], str(discreteInputChangeItem[0])), + str(discreteInputChangeItem[1][0])) + + for analogInputChangeItem in analogInputChangeset: + logger.debug("Analog input {} changed from {} to {}" + .format(analogInputChangeItem[0], + analogInputChangeItem[1][1], + analogInputChangeItem[1][0])) + + self.client.publish("{}/{}".format(self.config["analogInputEventTopicPrefix"], str(analogInputChangeItem[0])), + str(analogInputChangeItem[1][0])) + + diff --git a/src/MqttPeriodPublisher.py b/src/MqttPeriodPublisher.py new file mode 100644 index 0000000..1c2546f --- /dev/null +++ b/src/MqttPeriodPublisher.py @@ -0,0 +1,32 @@ +import paho.mqtt.client as mqtt +import threading +from loguru import logger +from AbstractMqttHandler import AbstractMqttPublisher +from time import sleep + +def mqttPeriodPublisherStart(config, processImage): + mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage) + mqttPeriodPublisherThread.start() + + +class MqttPeriodPublisher(AbstractMqttPublisher): + def __init__(self, config, processImage): + super().__init__(config, processImage) + + def localLoop(self): + while True: + with self.processImage: + if not self.processImage.isInitialized(): + continue + analogInputs = self.processImage.getAnalogsInputs() + + for analogInputItem in enumerate(analogInputs): + logger.debug("Analog input {} is {}" + .format(analogInputItem[0], + analogInputItem[1])) + + self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])), + str(analogInputItem[1])) + + sleep(self.config["analogInputPublishPeriod"]) + diff --git a/src/ProcessImage.py b/src/ProcessImage.py index fa6a83a..a083565 100644 --- a/src/ProcessImage.py +++ b/src/ProcessImage.py @@ -47,7 +47,6 @@ class ProcessImage(Condition): def getDiscreteInputs(self): if not self.initialized: raise NotInitializedException - self.shadowDiscreteInputs = self.discreteInputs return self.discreteInputs def setCoils(self, coils): @@ -65,7 +64,6 @@ class ProcessImage(Condition): def getCoils(self): if not self.initialized: raise NotInitializedException - self.shadowCoils = self.coils return self.coils def setAnalogsInputs(self, analogInputs): @@ -83,7 +81,6 @@ class ProcessImage(Condition): def getAnalogsInputs(self): if not self.initialized: raise NotInitializedException - self.shadowAnalogInputs = self.analogInputs return self.analogInputs diff --git a/src/digitaltwin1.py b/src/digitaltwin1.py index 3ba0b71..0a1843b 100644 --- a/src/digitaltwin1.py +++ b/src/digitaltwin1.py @@ -1,14 +1,39 @@ from ProcessImage import ProcessImage -from ModbusHandler import modbusHandler -from DummyPublisher import dummyPublisher as publisher +from ModbusHandler import modbusStart +from MqttEventPublisher import mqttEventPublisherStart +from MqttPeriodPublisher import mqttPeriodPublisherStart import threading +from loguru import logger +config = { + "modbus": { + "client": "172.16.2.157", + "scanrate": 0.25 + }, + "mqtt": { + "broker": "172.16.2.16", + "digitalOutputTopicPrefix": "dt1/coil", + "digitalInputTopicPrefix": "dt1/di", + "analogInputEventTopicPrefix": "dt1/ai/event", + "analogInputPeriodicTopicPrefix": "dt1/ai/periodic", + "analogInputPublishPeriod": 10.0 + } +} + + +logger.info("DigitalTwin1 starting") + processImage = ProcessImage() -modbusThread = threading.Thread(target=modbusHandler, args=[processImage]) -publisherThread = threading.Thread(target=publisher, args=[processImage]) +modbusStart(config, processImage) +logger.info("Modbus handler running") -modbusThread.start() -publisherThread.start() +mqttEventPublisherStart(config, processImage) +logger.info("MQTT event publisher running") + +mqttPeriodPublisherStart(config, processImage) +logger.info("MQTT period publisher running") + +logger.info("DigitalTwin1 running")