termination mechanism integrated

This commit is contained in:
Wolfgang Hottgenroth 2021-08-25 17:36:30 +02:00
parent 78f001fd70
commit 5cabef91ea
Signed by: wn
GPG Key ID: 6C1E5E531E0D5D7F
6 changed files with 136 additions and 77 deletions

View File

@ -22,6 +22,10 @@ class AbstractMqttPublisher(threading.Thread):
self.client = mqtt.Client(userdata=self) self.client = mqtt.Client(userdata=self)
# consider this flag in the localLoop
self.killBill = False
self.killEvent = threading.Event()
def run(self): def run(self):
self.client.on_message = mqttOnMessageCallback self.client.on_message = mqttOnMessageCallback
self.client.on_connect = mqttOnConnectCallback self.client.on_connect = mqttOnConnectCallback
@ -39,6 +43,18 @@ class AbstractMqttPublisher(threading.Thread):
def localLoop(self): def localLoop(self):
raise NotImplementedError() raise NotImplementedError()
def stop(self):
self.client.loop_stop()
logger.info("mqtt loop stopped")
self.killBill = True
logger.info("kill flag set")
self.killEvent.set()
with self.processImage:
self.processImage.notify()
logger.info("kill events triggered")
def onConnect(self): def onConnect(self):
logger.info("mqtt connected") logger.info("mqtt connected")

View File

@ -5,79 +5,86 @@ import threading
from loguru import logger from loguru import logger
def modbusStart(config, processImage):
modbusThread = threading.Thread(target=modbusHandler, args=[config, processImage]) class ModbusHandler(threading.Thread):
modbusThread.start() def __init__(self, config, processImage):
super().__init__()
self.config = config["modbus"]
self.processImage = processImage
self.killBill = False
def stop(self):
self.killBill = True
def run(self):
modbusClient = self.config["client"]
modbusRefreshPeriod = float(self.config["scanrate"])
client = ModbusClient(modbusClient)
def modbusHandler(config, processImage): try:
modbusClient = config["modbus"]["client"] client.connect()
modbusRefreshPeriod = float(config["modbus"]["scanrate"])
res = client.read_holding_registers(0x1010, 4)
if isinstance(res, ModbusIOException):
raise Exception(self.processImage)
client = ModbusClient(modbusClient) if len(res.registers) != 4:
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers)))
try: (analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers
client.connect() logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}")
res = client.read_holding_registers(0x1010, 4) self.processImage.init(digitalOutputBits, digitalInputBits, analogInputBits)
if isinstance(res, ModbusIOException):
raise Exception(processImage)
if len(res.registers) != 4: reg = client.read_coils(0, digitalOutputBits)
raise Exception("Unexpected number of registers for process image ({})".format(len(res.registers))) if isinstance(reg, ModbusIOException):
raise Exception(reg)
with self.processImage:
self.processImage.setCoils(reg.bits)
(analogOutputBits, analogInputBits, digitalOutputBits, digitalInputBits) = res.registers while not self.killBill:
logger.debug(f"AO: {analogOutputBits}, AI: {analogInputBits}, DO: {digitalOutputBits}, DI: {digitalInputBits}") try:
if not client.is_socket_open():
client.connect()
processImage.init(digitalOutputBits, digitalInputBits, analogInputBits) reg = client.read_input_registers(0, analogInputBits // 8)
reg = client.read_coils(0, digitalOutputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
with processImage:
processImage.setCoils(reg.bits)
while True:
try:
if not client.is_socket_open():
client.connect()
reg = client.read_input_registers(0, analogInputBits // 8)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
analogInputs = reg.registers
reg = client.read_discrete_inputs(0, digitalInputBits)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
discreteInputs = reg.bits
coils = None
with processImage:
processImage.setAnalogsInputs(analogInputs)
processImage.setDiscreteInputs(discreteInputs)
if processImage.hasPendingInputChanges():
processImage.notify()
if processImage.hasPendingOutputChanges():
coils = processImage.getCoils()
if coils:
logger.debug("write coils to device: {}".format(coils))
reg = client.write_coils(0, coils)
if isinstance(reg, ModbusIOException): if isinstance(reg, ModbusIOException):
raise Exception(reg) raise Exception(reg)
analogInputs = reg.registers
except Exception as e: reg = client.read_discrete_inputs(0, digitalInputBits)
logger.error("Exception in inner modbus handler loop: {}".format(e)) if isinstance(reg, ModbusIOException):
client.close() raise Exception(reg)
finally: discreteInputs = reg.bits
#sleep(modbusRefreshPeriod)
if (processImage.coilEvent.wait(timeout=modbusRefreshPeriod)): coils = None
processImage.coilEvent.clear() with self.processImage:
self.processImage.setAnalogsInputs(analogInputs)
self.processImage.setDiscreteInputs(discreteInputs)
if self.processImage.hasPendingInputChanges():
self.processImage.notify()
if self.processImage.hasPendingOutputChanges():
coils = self.processImage.getCoils()
if coils:
logger.debug("write coils to device: {}".format(coils))
reg = client.write_coils(0, coils)
if isinstance(reg, ModbusIOException):
raise Exception(reg)
except Exception as e:
logger.error("Exception in inner modbus handler loop: {}".format(e))
client.close()
finally:
#sleep(modbusRefreshPeriod)
if (self.processImage.coilEvent.wait(timeout=modbusRefreshPeriod)):
self.processImage.coilEvent.clear()
except Exception as e: except Exception as e:
logger.error("Exception in modbus handler: {}".format(e)) logger.error("Exception in modbus handler: {}".format(e))
finally: finally:
client.close() client.close()

View File

@ -14,7 +14,7 @@ class MqttCoilSubscriber(AbstractMqttPublisher):
super().__init__(config, processImage) super().__init__(config, processImage)
def localLoop(self): def localLoop(self):
while True: while not self.killBill:
sleep(float(self.config["analogInputPublishPeriod"])) sleep(float(self.config["analogInputPublishPeriod"]))
def onMessage(self, topic, payload): def onMessage(self, topic, payload):

View File

@ -15,9 +15,11 @@ class MqttEventPublisher(AbstractMqttPublisher):
self.disableAnalogInputEventPublishing = self.config["disableAnalogInputEventPublishing"].lower() in [ "true", "on" ] self.disableAnalogInputEventPublishing = self.config["disableAnalogInputEventPublishing"].lower() in [ "true", "on" ]
def localLoop(self): def localLoop(self):
while True: while not self.killBill:
with self.processImage: with self.processImage:
self.processImage.wait() self.processImage.wait()
if self.killBill:
continue
discreteInputChangeset = self.processImage.getChangedDiscreteInputs() discreteInputChangeset = self.processImage.getChangedDiscreteInputs()
if not self.disableAnalogInputEventPublishing: if not self.disableAnalogInputEventPublishing:

View File

@ -1,8 +1,6 @@
import paho.mqtt.client as mqtt from threading import Event
import threading
from loguru import logger from loguru import logger
from AbstractMqttHandler import AbstractMqttPublisher from AbstractMqttHandler import AbstractMqttPublisher
from time import sleep
def mqttPeriodPublisherStart(config, processImage): def mqttPeriodPublisherStart(config, processImage):
mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage) mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
@ -14,7 +12,7 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
super().__init__(config, processImage) super().__init__(config, processImage)
def localLoop(self): def localLoop(self):
while True: while not self.killBill:
with self.processImage: with self.processImage:
if not self.processImage.isInitialized(): if not self.processImage.isInitialized():
continue continue
@ -28,5 +26,5 @@ class MqttPeriodPublisher(AbstractMqttPublisher):
self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])), self.client.publish("{}/{}".format(self.config["analogInputPeriodicTopicPrefix"], str(analogInputItem[0])),
str(analogInputItem[1])) str(analogInputItem[1]))
sleep(float(self.config["analogInputPublishPeriod"])) self.killEvent.wait(timeout=float(self.config["analogInputPublishPeriod"]))

View File

@ -1,14 +1,23 @@
from loguru import logger from loguru import logger
import argparse import argparse
import configparser import configparser
import threading
from ProcessImage import ProcessImage from ProcessImage import ProcessImage
from ModbusHandler import modbusStart from ModbusHandler import ModbusHandler
from MqttEventPublisher import mqttEventPublisherStart from MqttEventPublisher import MqttEventPublisher, mqttEventPublisherStart
from MqttPeriodPublisher import mqttPeriodPublisherStart from MqttPeriodPublisher import MqttPeriodPublisher
from MqttCoilSubscriber import mqttCoilSubscriberStart from MqttCoilSubscriber import MqttCoilSubscriber
deathBell = threading.Event()
def exceptHook(args):
global deathBell
logger.error("Exception in thread caught: {}".format(args))
deathBell.set()
logger.error("rang the death bell")
logger.info("DigitalTwin1 starting") logger.info("DigitalTwin1 starting")
@ -27,17 +36,44 @@ config.read(args.config)
processImage = ProcessImage() processImage = ProcessImage()
modbusStart(config, processImage) modbusHandlerThread = ModbusHandler(config, processImage)
modbusHandlerThread.start()
logger.info("Modbus handler running") logger.info("Modbus handler running")
mqttEventPublisherStart(config, processImage) mqttEventPublisherThread = MqttEventPublisher(config, processImage)
mqttEventPublisherThread.start()
logger.info("MQTT event publisher running") logger.info("MQTT event publisher running")
mqttPeriodPublisherStart(config, processImage) mqttPeriodPublisherThread = MqttPeriodPublisher(config, processImage)
mqttPeriodPublisherThread.start()
logger.info("MQTT period publisher running") logger.info("MQTT period publisher running")
mqttCoilSubscriberStart(config, processImage) mqttCoilSubscriberThread = MqttCoilSubscriber(config, processImage)
mqttCoilSubscriberThread.start()
logger.info("MQTT coil subscriber running") logger.info("MQTT coil subscriber running")
threading.excepthook = exceptHook
logger.info("Threading excepthook set")
logger.info("DigitalTwin1 running") logger.info("DigitalTwin1 running")
deathBell.wait()
logger.error("DigitalTwin1 is dying")
modbusHandlerThread.stop()
mqttEventPublisherThread.stop()
mqttPeriodPublisherThread.stop()
mqttCoilSubscriberThread.stop()
modbusHandlerThread.join()
logger.error("modbusHandlerThread joined")
mqttEventPublisherThread.join()
logger.error("mqttEventPublisherThread joined")
mqttPeriodPublisherThread.join()
logger.error("mqttPeriodPublisherThread joined")
mqttCoilSubscriberThread.join()
logger.error("mqttCoilSubscriberThread joined")
logger.error("DigitalTwin1 to terminate")