From 37aa84d0f5868841a725e9f2a8a129b361707728 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 8 Jul 2019 15:48:35 +0200 Subject: [PATCH] actual publish something --- src/CommunicationProcessor.py | 5 +++-- src/MqttProcessor.py | 18 ++++++++++++++++-- src/RegisterDatapoint.py | 10 ++++++---- src/master.py | 6 ++++-- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/CommunicationProcessor.py b/src/CommunicationProcessor.py index 2f6941a..f5bbf54 100644 --- a/src/CommunicationProcessor.py +++ b/src/CommunicationProcessor.py @@ -5,10 +5,11 @@ import RegisterDatapoint from pymodbus.client.sync import ModbusSerialClient class CommunicationProcessor(threading.Thread): - def __init__(self, config, queue): + def __init__(self, config, queue, pubQueue): super().__init__() self.config = config self.queue = queue + self.pubQueue = pubQueue self.daemon = True def __getSerial(self): @@ -26,7 +27,7 @@ class CommunicationProcessor(threading.Thread): try: print("Dequeued: {0!s}".format(r)) r.enqueued = False - r.process(client) + r.process(client, self.pubQueue) except RegisterDatapoint.DatapointException as e: print("ERROR when processing '{0}': {1!s}".format(r.label, e)) if client.socket is None: diff --git a/src/MqttProcessor.py b/src/MqttProcessor.py index b4a6764..4815c3c 100644 --- a/src/MqttProcessor.py +++ b/src/MqttProcessor.py @@ -4,6 +4,11 @@ from NotificationForwarder import AbstractNotificationReceiver +class PublishItem(object): + def __init__(self, topic, payload): + self.topic = topic + self.payload = payload + def mqttOnConnectCallback(client, userdata, flags, rc): userdata.onConnect() @@ -14,11 +19,12 @@ def mqttOnDisconnectCallback(client, userdata, rc): userdata.onDisconnect(rc) class MqttProcessor(threading.Thread, AbstractNotificationReceiver): - def __init__(self, config, registers, queue): + def __init__(self, config, registers, queue, pubQueue): super().__init__() self.config = config self.registers = registers self.queue = queue + self.pubQueue = pubQueue self.client = mqtt.Client(userdata=self) self.subscriptions = [] self.topicRegisterMap ={} @@ -56,7 +62,15 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): if self.config.mqttLogin and self.config.mqttPassword: self.client.username_pw_set(self.config.mqttLogin, self.config.mqttPassword) self.client.connect(self.config.mqttBrokerHost, self.config.mqttBrokerPort) - self.client.loop_forever() + self.client.loop_start() + + while True: + pubItem = self.pubQueue.get() + if isinstance(pubItem, PublishItem): + self.client.publish(pubItem.topic, pubItem.payload) + else: + print("Invalid object in publish queue") + def onConnect(self): # print("MqttProcessor.onConnect") diff --git a/src/RegisterDatapoint.py b/src/RegisterDatapoint.py index 2c39bc8..52cd333 100644 --- a/src/RegisterDatapoint.py +++ b/src/RegisterDatapoint.py @@ -1,6 +1,7 @@ import datetime from pymodbus.pdu import ExceptionResponse from pymodbus.exceptions import ModbusIOException +from src import MqttProcessor class DatapointException(Exception): pass @@ -39,7 +40,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): def __str__(self): return "[{0!s}, Read: {1}, Write: {2}, Feedback: {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic) - def process(self, client): + def process(self, client, pubQueue): successFull = True giveUp = False if self.writeRequestValue: @@ -62,7 +63,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): if type(result) in [ExceptionResponse, ModbusIOException]: raise DatapointException(result) print("{0}: {1!s}".format(self.label, result.registers)) - + pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(result.registers))) if successFull: self.lastContact = datetime.datetime.now() # publish value @@ -89,18 +90,19 @@ class InputRegisterDatapoint(AbstractModbusDatapoint): def __str__(self): return "[{0!s}, {1}".format(super().__str__(), self.publishTopic) - def process(self, client): + def process(self, client, pubQueue): successFull = True giveUp = False # perform read operation + print("Input register, perform read operation") result = client.read_input_registers(address=self.address, count=self.count, unit=self.unit) if type(result) in [ExceptionResponse, ModbusIOException]: raise DatapointException(result) print("{0}: {1!s}".format(self.label, result.registers)) + pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(result.registers))) - print("Input register, perform read operation") if successFull: self.lastContact = datetime.datetime.now() # publish value diff --git a/src/master.py b/src/master.py index 772b5a6..9a31a45 100644 --- a/src/master.py +++ b/src/master.py @@ -2,6 +2,7 @@ import CmdServer import MqttProcessor import CommunicationProcessor import MyPriorityQueue +import queue import NotificationForwarder import Config import ScanRateConsideringQueueFeeder @@ -13,6 +14,7 @@ import pickle if __name__ == "__main__": queue = MyPriorityQueue.MyPriorityQueue() + pubQueue = queue.queue() nf = NotificationForwarder.NotificationForwarder() config = Config.Config() @@ -21,10 +23,10 @@ if __name__ == "__main__": datapoints = pickle.load(f) RegisterDatapoint.checkRegisterList(datapoints) - cp = CommunicationProcessor.CommunicationProcessor(config, queue) + cp = CommunicationProcessor.CommunicationProcessor(config, queue, pubQueue) cp.start() - mp = MqttProcessor.MqttProcessor(config, datapoints, queue) + mp = MqttProcessor.MqttProcessor(config, datapoints, queue, pubQueue) nf.register(mp) mp.start()