From a2a5a924bdd983c729b65b567aa61adf64478188 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Tue, 25 Jun 2019 17:26:23 +0200 Subject: [PATCH] mqtt handling --- snippets/test8.py | 86 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 14 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index 4b57a1a..a77e984 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -4,6 +4,15 @@ import threading import socketserver import cmd import io +import paho.mqtt.client as mqtt + + +class Config(object): + def __init__(self): + self.mqttBrokerHost = '127.0.0.1' + self.mqttBrokerPort = 1883 + self.mqttLogin = '' + self.mqttPassword = '' @@ -31,6 +40,8 @@ class AbstractModbusDatapoint(object): self.count = count self.scanRate = scanRate self.type = 'abstract data point' + self.command = None + self.value = None self.enqueued = False if self.scanRate: self.priority = 1 @@ -45,7 +56,13 @@ class AbstractModbusDatapoint(object): def __ge__(self, other): return self.priority >= other.priority def __str__(self): - return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count) + return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value) + + def setCommand(self, cmd): + self.command = cmd + + def setValue(self, value): + self.value = value class HoldingRegisterDatapoint(AbstractModbusDatapoint): @@ -87,32 +104,72 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): self.writeRequestValue = value +def mqttOnConnectCallback(client, userdata, flags, rc): + userdata.onConnect() + +def mqttOnMessageCallback(client, userdata, message): + userdata.onMessage(message.topic, message.payload) + +def mqttOnDisconnectCallback(client, userdata, rc): + userdata.onDisconnect(rc) + class MqttProcessor(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): super(MqttProcessor, self).__init__() self.registers = registers self.queue = queue + self.client = mqtt.Client(userdata=self) + self.subscriptions = [] + self.topicRegisterMap ={} + + def __processUpdatedRegisters(self, force=False): + # print("MqttProcessor.__updateSubscriptions") + + subscribeTopics = [ r.subscribeTopic for r in self.registers if r.subscribeTopic] + # print("Topics: {0!s}".format(subscribeTopics)) + + for subscribeTopic in subscribeTopics: + if (subscribeTopic not in self.subscriptions) or force: + print("Subscribe to {0}".format(subscribeTopic)) + self.client.subscribe(subscribeTopic) + self.subscriptions.append(subscribeTopic) + + for subscription in self.subscriptions: + if (subscription not in subscribeTopics) and not force: + print("Unsubscribe from {0}".format(subscription)) + self.client.unsubscribe(subscription) + self.subscriptions.remove(subscription) + + self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if r.subscribeTopic } def receiveNotification(self, arg): print("MqttProcessor:registersChanged") - pass - # subscribe and/or unsubscribe according to register changes + self.__processUpdatedRegisters() def run(self): - pass - # set mqtt callbacks - # mqtt connect - # mqtt loop forever + # print("MqttProcessor.run") + self.client.on_message = mqttOnMessageCallback + self.client.on_connect = mqttOnConnectCallback + self.client.on_disconnect = mqttOnDisconnectCallback + if config.mqttLogin and config.mqttPassword: + self.client.username_pw_set(config.mqttLogin, config.mqttPassword) + self.client.connect(config.mqttBrokerHost, config.mqttBrokerPort) + self.client.loop_forever() def onConnect(self): - pass - # subscribe to all subscribe topics from registers + # print("MqttProcessor.onConnect") + self.__processUpdatedRegisters(force=True) + + def onDisconnect(self, rc): + print("Disconnected from MQTT broker: {0}".format(rc)) def onMessage(self, topic, payload): - pass - # call onMessage method of register with related subscribe topic - # put register yourself in high prio queue - # self.put(r) + # print("MqttProcessor.onMessage") + r = self.topicRegisterMap[topic] + # print("{0}: {1!s} -> {2!s}".format(topic, payload, r)) + r.setCommand('w') + r.setValue(payload) + self.queue.put(r) class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): @@ -140,6 +197,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei ] registersToBeHandled.sort(key=lambda x : x.scanRate) for r in registersToBeHandled: + r.setCommand('r') self.queue.put(r) r.enqueued = True self.delayEvent.wait(self.delay) @@ -210,7 +268,7 @@ datapoints = [ queue = queue.PriorityQueue() nf = NotificationForwarder() - +config = Config() if __name__ == "__main__": cp = CommunicationProcessor(queue)