From 882571425b53dfab07d312d6580022aa0c1b1d5a Mon Sep 17 00:00:00 2001 From: Wolfgang Ludger Hottgenroth Date: Mon, 23 Aug 2021 17:45:59 +0200 Subject: [PATCH] output working --- src/ModbusHandler.py | 1 + src/MqttCoilSubscriber.py | 45 +++++++++++++++++++++++++++++++++++++++ src/ProcessImage.py | 31 +++++++++++++++++---------- src/digitaltwin1.py | 4 ++++ 4 files changed, 70 insertions(+), 11 deletions(-) create mode 100644 src/MqttCoilSubscriber.py diff --git a/src/ModbusHandler.py b/src/ModbusHandler.py index 692a29d..22e3e3a 100644 --- a/src/ModbusHandler.py +++ b/src/ModbusHandler.py @@ -63,6 +63,7 @@ def modbusHandler(config, processImage): coils = processImage.getCoils() if coils: + logger.debug("write coils to device: {}".format(coils)) reg = client.write_coils(0, coils) if isinstance(reg, ModbusIOException): raise Exception(reg) diff --git a/src/MqttCoilSubscriber.py b/src/MqttCoilSubscriber.py new file mode 100644 index 0000000..c6a528f --- /dev/null +++ b/src/MqttCoilSubscriber.py @@ -0,0 +1,45 @@ +import paho.mqtt.client as mqtt +import threading +from loguru import logger +from AbstractMqttHandler import AbstractMqttPublisher +from time import sleep + +def mqttCoilSubscriberStart(config, processImage): + mqttCoilSubscriberThread = MqttCoilSubscriber(config, processImage) + mqttCoilSubscriberThread.start() + + +class MqttCoilSubscriber(AbstractMqttPublisher): + def __init__(self, config, processImage): + super().__init__(config, processImage) + + def localLoop(self): + while True: + sleep(self.config["analogInputPublishPeriod"]) + + def onMessage(self, topic, payload): + logger.warning("mqtt message received: {} -> {}".format(topic, str(payload))) + coilNum = topic.split('/')[-1] + try: + coilNum = int(coilNum) + if (coilNum < 0) or (coilNum >= self.processImage.numCoils): + raise ValueError("coilNum too large or negative") + + payload = payload.decode("ascii") + if payload.lower() in [ "on", "true", "1" ]: + state = True + elif payload.lower() in [ "off", "false", "0" ]: + state = False + else: + raise ValueError("Invalid payload") + + with self.processImage: + self.processImage.setCoil(coilNum, state) + logger.debug("requested to set coil {} to {}".format(coilNum, state)) + except ValueError as e: + logger.warning("invavlid topic ({}) or payload ({}) in message, error {}".format(topic, str(payload), e)) + + def onConnect(self): + logger.info("mqtt connected") + self.client.subscribe("{}/+".format(self.config["digitalOutputTopicPrefix"])) + logger.info("subscribed to coil topic") diff --git a/src/ProcessImage.py b/src/ProcessImage.py index a083565..81d8d44 100644 --- a/src/ProcessImage.py +++ b/src/ProcessImage.py @@ -1,5 +1,5 @@ from threading import Condition - +from loguru import logger def zippingFilter(a, b): return [ x for x in enumerate(zip(a, b)) if x[1][0] != x[1][1] ] @@ -12,12 +12,15 @@ class ProcessImage(Condition): self.initialized = False def init(self, numCoils, numDiscreteInputs, numAnalogInputs): + self.numCoils = numCoils self.coils = [] self.shadowCoils = [ None ] * numCoils + self.numDiscreteInputs = numDiscreteInputs self.discreteInputs = [] self.shadowDiscreteInputs = [ None ] * numDiscreteInputs + self.numAnalogInputs = numAnalogInputs self.analogInputs = [] self.shadowAnalogInputs = [ None ] * (numAnalogInputs // 8) @@ -41,29 +44,35 @@ class ProcessImage(Condition): if not self.initialized: raise NotInitializedException changedDiscreteInputs = zippingFilter(self.discreteInputs, self.shadowDiscreteInputs) - self.shadowDiscreteInputs = self.discreteInputs + self.shadowDiscreteInputs = self.discreteInputs[:] return changedDiscreteInputs - def getDiscreteInputs(self): - if not self.initialized: - raise NotInitializedException - return self.discreteInputs +# def getDiscreteInputs(self): +# if not self.initialized: +# raise NotInitializedException +# return self.discreteInputs def setCoils(self, coils): if not self.initialized: raise NotInitializedException self.coils = coils - def getChangedCoils(self): + def setCoil(self, coilNum, value): if not self.initialized: raise NotInitializedException - changedCoils = zippingFilter(self.coils, self.shadowCoils) - self.shadowCoils = self.coils - return changedCoils + self.coils[coilNum] = value + +# def getChangedCoils(self): +# if not self.initialized: +# raise NotInitializedException +# changedCoils = zippingFilter(self.coils, self.shadowCoils) +# self.shadowCoils = self.coils +# return changedCoils def getCoils(self): if not self.initialized: raise NotInitializedException + self.shadowCoils = self.coils[:] return self.coils def setAnalogsInputs(self, analogInputs): @@ -75,7 +84,7 @@ class ProcessImage(Condition): if not self.initialized: raise NotInitializedException changedAnalogInputs = zippingFilter(self.analogInputs, self.shadowAnalogInputs) - self.shadowAnalogInputs = self.analogInputs + self.shadowAnalogInputs = self.analogInputs[:] return changedAnalogInputs def getAnalogsInputs(self): diff --git a/src/digitaltwin1.py b/src/digitaltwin1.py index 0a1843b..1de62ea 100644 --- a/src/digitaltwin1.py +++ b/src/digitaltwin1.py @@ -2,6 +2,7 @@ from ProcessImage import ProcessImage from ModbusHandler import modbusStart from MqttEventPublisher import mqttEventPublisherStart from MqttPeriodPublisher import mqttPeriodPublisherStart +from MqttCoilSubscriber import mqttCoilSubscriberStart import threading from loguru import logger @@ -35,5 +36,8 @@ logger.info("MQTT event publisher running") mqttPeriodPublisherStart(config, processImage) logger.info("MQTT period publisher running") +mqttCoilSubscriberStart(config, processImage) +logger.info("MQTT coil subscriber running") + logger.info("DigitalTwin1 running")