From 04c1d777e4a061178456b3268f89a07d5f0322df Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 8 Jul 2019 12:12:13 +0200 Subject: [PATCH] prepare application --- snippets/ENV => ENV | 0 snippets/test8.py | 415 -------------------------- src/AbstractModbusDatapoint.py | 26 ++ src/CmdServer.py | 166 +++++++++++ src/CommunicationProcessor.py | 16 + src/Config.py | 8 + src/HoldingRegisterDatapoint.py | 43 +++ src/MqttProcessor.py | 76 +++++ src/MyPriorityQueue.py | 22 ++ src/NotificationForwarder.py | 15 + src/ScanRateConsideringQueueFeeder.py | 35 +++ src/master.py | 39 +++ 12 files changed, 446 insertions(+), 415 deletions(-) rename snippets/ENV => ENV (100%) create mode 100644 src/AbstractModbusDatapoint.py create mode 100644 src/CmdServer.py create mode 100644 src/CommunicationProcessor.py create mode 100644 src/Config.py create mode 100644 src/HoldingRegisterDatapoint.py create mode 100644 src/MqttProcessor.py create mode 100644 src/MyPriorityQueue.py create mode 100644 src/NotificationForwarder.py create mode 100644 src/ScanRateConsideringQueueFeeder.py create mode 100644 src/master.py diff --git a/snippets/ENV b/ENV similarity index 100% rename from snippets/ENV rename to ENV diff --git a/snippets/test8.py b/snippets/test8.py index af4b027..4175200 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -8,420 +8,5 @@ import paho.mqtt.client as mqtt import re -class Config(object): - def __init__(self): - self.mqttBrokerHost = '127.0.0.1' - self.mqttBrokerPort = 1883 - self.mqttLogin = '' - self.mqttPassword = '' - -class AbstractNotificationReceiver(object): - def receiveNotification(self, arg): - raise NotImplementedError - -class NotificationForwarder(object): - def __init__(self): - self.receivers = [] - - def register(self, receiver): - self.receivers.append(receiver) - - def notify(self, arg=None): - for r in self.receivers: - r.receiveNotification(arg) - - -class AbstractModbusDatapoint(object): - def __init__(self, label, unit, address, count, scanRate): - self.label = label - self.unit = unit - self.address = address - self.count = count - self.scanRate = scanRate - self.type = 'abstract data point' - self.command = None - self.value = None - self.enqueued = False - if self.scanRate: - self.priority = 1 - else: - self.priority = 0 - - def __str__(self): - return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value) - - def setCommand(self, cmd): - self.command = cmd - - def setValue(self, value): - self.value = value - - -class HoldingRegisterDatapoint(AbstractModbusDatapoint): - def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): - super().__init__(label, unit, address, count, scanRate) - self.publishTopic = publishTopic - self.subscribeTopic = subscribeTopic - self.feedbackTopic = feedbackTopic - self.writeRequestValue = None - self.lastContact = None - self.type = 'read holding register' - - def __str__(self): - return "[{0!s}, {1} {2} {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic) - - def process(self): - successFull = False - giveUp = False - if self.writeRequestValue: - # perform write operation - if successFull: - # give feedback - self.writeRequestValue = None - else: - # retries handling - if giveUp: - # give negative feedback - self.writeRequestValue = None - else: - # perform read operation - if successFull: - self.lastContact = datetime.datetime.now() - # publish value - else: - # retries handling - if giveUp: - # backoff and availability handling - # give negative feedback - pass - - def onMessage(self, value): - self.writeRequestValue = value - - -def mqttOnConnectCallback(client, userdata, flags, rc): - userdata.onConnect() - -def mqttOnMessageCallback(client, userdata, message): - userdata.onMessage(message.topic, message.payload) - -def mqttOnDisconnectCallback(client, userdata, rc): - userdata.onDisconnect(rc) - -class MqttProcessor(threading.Thread, AbstractNotificationReceiver): - def __init__(self, registers, queue): - super().__init__() - self.registers = registers - self.queue = queue - self.client = mqtt.Client(userdata=self) - self.subscriptions = [] - self.topicRegisterMap ={} - - def __processUpdatedRegisters(self, force=False): - # print("MqttProcessor.__updateSubscriptions") - - subscribeTopics = [ r.subscribeTopic for r in self.registers if r.subscribeTopic] - # print("Topics: {0!s}".format(subscribeTopics)) - - for subscribeTopic in subscribeTopics: - if (subscribeTopic not in self.subscriptions) or force: - print("Subscribe to {0}".format(subscribeTopic)) - self.client.subscribe(subscribeTopic) - self.subscriptions.append(subscribeTopic) - - for subscription in self.subscriptions: - if (subscription not in subscribeTopics) and not force: - print("Unsubscribe from {0}".format(subscription)) - self.client.unsubscribe(subscription) - self.subscriptions.remove(subscription) - - self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if r.subscribeTopic } - - def receiveNotification(self, arg): - print("MqttProcessor:registersChanged") - self.__processUpdatedRegisters() - - def run(self): - # print("MqttProcessor.run") - self.client.on_message = mqttOnMessageCallback - self.client.on_connect = mqttOnConnectCallback - self.client.on_disconnect = mqttOnDisconnectCallback - if config.mqttLogin and config.mqttPassword: - self.client.username_pw_set(config.mqttLogin, config.mqttPassword) - self.client.connect(config.mqttBrokerHost, config.mqttBrokerPort) - self.client.loop_forever() - - def onConnect(self): - # print("MqttProcessor.onConnect") - self.__processUpdatedRegisters(force=True) - - def onDisconnect(self, rc): - print("Disconnected from MQTT broker: {0}".format(rc)) - - def onMessage(self, topic, payload): - # print("MqttProcessor.onMessage") - r = self.topicRegisterMap[topic] - # print("{0}: {1!s} -> {2!s}".format(topic, payload, r)) - r.setCommand('w') - r.setValue(payload) - self.queue.put(r) - - -class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): - def __init__(self, registers, queue): - super().__init__() - self.registers = registers - self.queue = queue - self.delayEvent = threading.Event() - - def getMinimalScanrate(self): - return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) - - def receiveNotification(self, arg): - print("ScanRateConsideringQueueFeeder:registersChanged") - self.delay = self.getMinimalScanrate() - - def run(self): - self.delay = self.getMinimalScanrate() - while True: - registersToBeHandled = [ - r for r in self.registers if ((not r.enqueued) and - (r.scanRate) and - ((not r.lastContact) or - (r.lastContact + r.scanRate < datetime.datetime.now()))) - ] - registersToBeHandled.sort(key=lambda x : x.scanRate) - for r in registersToBeHandled: - r.setCommand('r') - self.queue.put(r) - r.enqueued = True - self.delayEvent.wait(self.delay) - - -class CommunicationProcessor(threading.Thread): - def __init__(self, queue): - super().__init__() - self.queue = queue - - def run(self): - while True: - r = self.queue.get() - # r.process() - r.lastContact = datetime.datetime.now() - print("Dequeued: {0!s}".format(r)) - r.enqueued = False - - -class MyPriorityQueueItem(object): - def __init__(self, itemWithPriority): - self.itemWithPriority = itemWithPriority - - def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority - def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority - def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority - def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority - def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority - def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority - -class MyPriorityQueue(queue.PriorityQueue): - def _put(self, itemWithPriority): - i = MyPriorityQueueItem(itemWithPriority) - super()._put(i) - - def _get(self): - i = super()._get() - return i.itemWithPriority - -class CmdInterpreterException(ValueError): pass - -def parseIntArbitraryBase(s): - i = 0 - if s.startswith('0x'): - i = int(s, 16) - elif s.startswith('0b'): - i = int(s, 2) - else: - i = int(s, 10) - return i - -class CmdInterpreter(cmd.Cmd): - def __init__(self, infile, outfile, notifier, registers): - super().__init__(stdin=infile, stdout=outfile) - self.use_rawinput = False - self.notifier = notifier - self.registers = registers - self.prompt = "test8> " - self.intro = "test8 admin interface" - self.splitterRe = re.compile('\s+') - - def __print(self, text): - self.stdout.write(text) - - def __println(self, text): - self.stdout.write(text) - self.stdout.write("\n\r") - - def do_notify(self, arg): - self.notifier.notify() - - def help_notify(self): - self.__println("Notifies threads using the list of datapoints about changes in this list.") - self.__println("Call after modifications on the list.") - - def do_quit(self, arg): - self.__println("Bye!") - return True - - def do_add(self, arg): - try: - (registerType, label, unit, address, count, scanrate, readTopic, writeTopic, feedbackTopic) = self.splitterRe.split(arg) - self.__println("RegisterType: {0}".format(registerType)) - self.__println("Label: {0}".format(label)) - self.__println("Unit: {0}".format(unit)) - self.__println("Address: {0}".format(address)) - self.__println("Count: {0}".format(count)) - self.__println("ScanRate: {0}".format(scanrate)) - self.__println("ReadTopic: {0}".format(readTopic)) - self.__println("WriteTopic: {0}".format(writeTopic)) - self.__println("FeedbackTopic: {0}".format(feedbackTopic)) - - if readTopic == 'None': - readTopic = None - if writeTopic == 'None': - writeTopic = None - if feedbackTopic == 'None': - feedbackTopic = None - unit = parseIntArbitraryBase(unit) - address = parseIntArbitraryBase(address) - count = parseIntArbitraryBase(count) - scanrate = float(scanrate) - if scanrate == 0: - if readTopic: - raise CmdInterpreterException('readTopic must not be set when scanRate is zero') - if not writeTopic: - raise CmdInterpreterException('writeTopic must be set when scanRate is zero') - if not feedbackTopic: - raise CmdInterpreterException('feedbackTopic must be set when scanRate is zero') - else: - if not readTopic: - raise CmdInterpreterException('readTopic must be set when scanRate is zero') - if writeTopic: - raise CmdInterpreterException('writeTopic must not be set when scanRate is zero') - if feedbackTopic: - raise CmdInterpreterException('feedbackTopic must not be set when scanRate is zero') - allowedRegisterTypes = ['HoldingRegister'] - if registerType not in allowedRegisterTypes: - raise CmdInterpreterException('Unknown register type {0}, allowed are {1!s}'.format(registerType, allowedRegisterTypes)) - - - except ValueError as e: - self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) - - def help_add(self): - # HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), - self.__println("Usage: add