From 5cabef91ea01c7a8ce34a5c32a9fbde703552330 Mon Sep 17 00:00:00 2001 From: Wolfgang Ludger Hottgenroth Date: Wed, 25 Aug 2021 17:36:30 +0200 Subject: [PATCH] termination mechanism integrated --- src/AbstractMqttHandler.py | 16 +++++ src/ModbusHandler.py | 131 +++++++++++++++++++------------------ src/MqttCoilSubscriber.py | 2 +- src/MqttEventPublisher.py | 4 +- src/MqttPeriodPublisher.py | 8 +-- src/digitaltwin1.py | 52 ++++++++++++--- 6 files changed, 136 insertions(+), 77 deletions(-) diff --git a/src/AbstractMqttHandler.py b/src/AbstractMqttHandler.py index 4f3a6e0..9666ac8 100644 --- a/src/AbstractMqttHandler.py +++ b/src/AbstractMqttHandler.py @@ -22,6 +22,10 @@ class AbstractMqttPublisher(threading.Thread): self.client = mqtt.Client(userdata=self) + # consider this flag in the localLoop + self.killBill = False + self.killEvent = threading.Event() + def run(self): self.client.on_message = mqttOnMessageCallback self.client.on_connect = mqttOnConnectCallback @@ -39,6 +43,18 @@ class AbstractMqttPublisher(threading.Thread): def localLoop(self): raise NotImplementedError() + def stop(self): + self.client.loop_stop() + logger.info("mqtt loop stopped") + + self.killBill = True + logger.info("kill flag set") + + self.killEvent.set() + with self.processImage: + self.processImage.notify() + logger.info("kill events triggered") + def onConnect(self): logger.info("mqtt connected") diff --git a/src/ModbusHandler.py b/src/ModbusHandler.py index 3bca256..07ca4c4 100644 --- a/src/ModbusHandler.py +++ b/src/ModbusHandler.py @@ -5,79 +5,86 @@ import threading from loguru import logger -def modbusStart(config, processImage): - modbusThread = threading.Thread(target=modbusHandler, args=[config, processImage]) - modbusThread.start() + +class ModbusHandler(threading.Thread): + def __init__(self, config, processImage): + super().__init__() + + self.config = config["modbus"] + self.processImage = processImage + self.killBill = False + + def stop(self): + self.killBill = True + + def run(self): + modbusClient = self.config["client"] + modbusRefreshPeriod = float(self.config["scanrate"]) + + client = ModbusClient(modbusClient) -def modbusHandler(config, processImage): - modbusClient = config["modbus"]["client"] - modbusRefreshPeriod = float(config["modbus"]["scanrate"]) + try: + client.connect() + res = client.read_holding_registers(0x1010, 4) + if isinstance(res, ModbusIOException): + raise Exception(self.processImage) - client = ModbusClient(modbusClient) + if len(res.registers) != 4: + raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers))) - try: - client.connect() + (analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers + logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}") - res = client.read_holding_registers(0x1010, 4) - if isinstance(res, ModbusIOException): - raise Exception(processImage) + self.processImage.init(digitalOutputBits, digitalInputBits, analogInputBits) - if len(res.registers) != 4: - raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers))) + reg = client.read_coils(0, digitalOutputBits) + if isinstance(reg, ModbusIOException): + raise Exception(reg) + with self.processImage: + self.processImage.setCoils(reg.bits) - (analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers - logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}") + while not self.killBill: + try: + if not client.is_socket_open(): + client.connect() - processImage.init(digitalOutputBits, digitalInputBits, analogInputBits) - - reg = client.read_coils(0, digitalOutputBits) - if isinstance(reg, ModbusIOException): - raise Exception(reg) - with processImage: - processImage.setCoils(reg.bits) - - while True: - try: - if not client.is_socket_open(): - client.connect() - - reg = client.read_input_registers(0, analogInputBits // 8) - if isinstance(reg, ModbusIOException): - raise Exception(reg) - analogInputs = reg.registers - - reg = client.read_discrete_inputs(0, digitalInputBits) - if isinstance(reg, ModbusIOException): - raise Exception(reg) - discreteInputs = reg.bits - - coils = None - with processImage: - processImage.setAnalogsInputs(analogInputs) - processImage.setDiscreteInputs(discreteInputs) - if processImage.hasPendingInputChanges(): - processImage.notify() - if processImage.hasPendingOutputChanges(): - coils = processImage.getCoils() - - if coils: - logger.debug("write coils to device: {}".format(coils)) - reg = client.write_coils(0, coils) + reg = client.read_input_registers(0, analogInputBits // 8) if isinstance(reg, ModbusIOException): raise Exception(reg) + analogInputs = reg.registers - except Exception as e: - logger.error("Exception in inner modbus handler loop: {}".format(e)) - client.close() - finally: - #sleep(modbusRefreshPeriod) - if (processImage.coilEvent.wait(timeout=modbusRefreshPeriod)): - processImage.coilEvent.clear() + reg = client.read_discrete_inputs(0, digitalInputBits) + if isinstance(reg, ModbusIOException): + raise Exception(reg) + discreteInputs = reg.bits + + coils = None + with self.processImage: + self.processImage.setAnalogsInputs(analogInputs) + self.processImage.setDiscreteInputs(discreteInputs) + if self.processImage.hasPendingInputChanges(): + self.processImage.notify() + if self.processImage.hasPendingOutputChanges(): + coils = self.processImage.getCoils() + + if coils: + logger.debug("write coils to device: {}".format(coils)) + reg = client.write_coils(0, coils) + if isinstance(reg, ModbusIOException): + raise Exception(reg) + + except Exception as e: + logger.error("Exception in inner modbus handler loop: {}".format(e)) + client.close() + finally: + #sleep(modbusRefreshPeriod) + if (self.processImage.coilEvent.wait(timeout=modbusRefreshPeriod)): + self.processImage.coilEvent.clear() - except Exception as e: - logger.error("Exception in modbus handler: {}".format(e)) - finally: - client.close() + except Exception as e: + logger.error("Exception in modbus handler: {}".format(e)) + finally: + client.close() diff --git a/src/MqttCoilSubscriber.py b/src/MqttCoilSubscriber.py index bdd4a0d..db7e49b 100644 --- a/src/MqttCoilSubscriber.py +++ b/src/MqttCoilSubscriber.py @@ -14,7 +14,7 @@ class MqttCoilSubscriber(AbstractMqttPublisher): super().__init__(config, processImage) def localLoop(self): - while True: + while not self.killBill: sleep(float(self.config["analogInputPublishPeriod"])) def onMessage(self, topic, payload): diff --git a/src/MqttEventPublisher.py b/src/MqttEventPublisher.py index e583b84..05dc0d9 100644 --- a/src/MqttEventPublisher.py +++ b/src/MqttEventPublisher.py @@ -15,9 +15,11 @@ class MqttEventPublisher(AbstractMqttPublisher): self.disableAnalogInputEventPublishing = self.config["disableAnalogInputEventPublishing"].lower() in [ "true", "on" ] def localLoop(self): - while True: + while not self.killBill: with self.processImage: self.processImage.wait() + if self.killBill: + continue discreteInputChangeset = self.processImage.getChangedDiscreteInputs() if not self.disableAnalogInputEventPublishing: diff --git a/src/MqttPeriodPublisher.py b/src/MqttPeriodPublisher.py index bd7d0e2..9d1a380 100644 --- a/src/MqttPeriodPublisher.py +++ b/src/MqttPeriodPublisher.py @@ -1,8 +1,6 @@ -import paho.mqtt.client as mqtt -import threading +from threading import Event from loguru import logger from AbstractMqttHandler import AbstractMqttPublisher -from time import sleep def mqttPeriodPublisherStart(config, processImage): mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage) @@ -14,7 +12,7 @@ class MqttPeriodPublisher(AbstractMqttPublisher): super().__init__(config, processImage) def localLoop(self): - while True: + while not self.killBill: with self.processImage: if not self.processImage.isInitialized(): continue @@ -28,5 +26,5 @@ class MqttPeriodPublisher(AbstractMqttPublisher): self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])), str(analogInputItem[1])) - sleep(float(self.config["analogInputPublishPeriod"])) + self.killEvent.wait(timeout=float(self.config["analogInputPublishPeriod"])) diff --git a/src/digitaltwin1.py b/src/digitaltwin1.py index b8e779f..d384d65 100644 --- a/src/digitaltwin1.py +++ b/src/digitaltwin1.py @@ -1,14 +1,23 @@ from loguru import logger import argparse import configparser +import threading from ProcessImage import ProcessImage -from ModbusHandler import modbusStart -from MqttEventPublisher import mqttEventPublisherStart -from MqttPeriodPublisher import mqttPeriodPublisherStart -from MqttCoilSubscriber import mqttCoilSubscriberStart +from ModbusHandler import ModbusHandler +from MqttEventPublisher import MqttEventPublisher, mqttEventPublisherStart +from MqttPeriodPublisher import MqttPeriodPublisher +from MqttCoilSubscriber import MqttCoilSubscriber +deathBell = threading.Event() + +def exceptHook(args): + global deathBell + logger.error("Exception in thread caught: {}".format(args)) + deathBell.set() + logger.error("rang the death bell") + logger.info("DigitalTwin1 starting") @@ -27,17 +36,44 @@ config.read(args.config) processImage = ProcessImage() -modbusStart(config, processImage) +modbusHandlerThread = ModbusHandler(config, processImage) +modbusHandlerThread.start() logger.info("Modbus handler running") -mqttEventPublisherStart(config, processImage) +mqttEventPublisherThread = MqttEventPublisher(config, processImage) +mqttEventPublisherThread.start() logger.info("MQTT event publisher running") -mqttPeriodPublisherStart(config, processImage) +mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage) +mqttPeriodPublisherThread.start() logger.info("MQTT period publisher running") -mqttCoilSubscriberStart(config, processImage) +mqttCoilSubscriberThread = MqttCoilSubscriber(config, processImage) +mqttCoilSubscriberThread.start() logger.info("MQTT coil subscriber running") +threading.excepthook = exceptHook +logger.info("Threading excepthook set") + logger.info("DigitalTwin1 running") + +deathBell.wait() +logger.error("DigitalTwin1 is dying") + +modbusHandlerThread.stop() +mqttEventPublisherThread.stop() +mqttPeriodPublisherThread.stop() +mqttCoilSubscriberThread.stop() + +modbusHandlerThread.join() +logger.error("modbusHandlerThread joined") +mqttEventPublisherThread.join() +logger.error("mqttEventPublisherThread joined") +mqttPeriodPublisherThread.join() +logger.error("mqttPeriodPublisherThread joined") +mqttCoilSubscriberThread.join() +logger.error("mqttCoilSubscriberThread joined") + +logger.error("DigitalTwin1 to terminate") +