From 4a090c5a73f60126b8480e85f9a58608700e5332 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Sat, 22 Jun 2019 00:45:28 +0200 Subject: [PATCH] priority handling tested --- snippets/test8.py | 85 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/snippets/test8.py b/snippets/test8.py index da3f39e..74f6a14 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -3,25 +3,39 @@ import datetime import threading class AbstractModbusDatapoint(object): - def __init__(self, label, unit, address, count): + def __init__(self, label, unit, address, count, scanRate): self.label = label self.unit = unit self.address = address self.count = count + self.scanRate = scanRate self.type = 'abstract data point' + self.enqueued = False + if self.scanRate: + self.priority = 1 + else: + self.priority = 0 + + + + def __lt__(self, other): return self.priority < other.priority + def __le__(self, other): return self.priority <= other.priority + def __eq__(self, other): return self.priority == other.priority + def __ne__(self, other): return self.priority != other.priority + def __gt__(self, other): return self.priority > other.priority + def __ge__(self, other): return self.priority >= other.priority def __str__(self): return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count) class HoldingRegisterDatapoint(AbstractModbusDatapoint): def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): - super(AbstractModbusDatapoint, self).__init__(label, unit, address, count) - self.scanRate = scanRate + super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate) self.publishTopic = publishTopic self.subscribeTopic = subscribeTopic self.feedbackTopic = feedbackTopic self.writeRequestValue = None - self.lastContact = 0 + self.lastContact = None self.type = 'read holding register' def process(self): @@ -55,7 +69,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): class MqttProcessor(threading.Thread): def __init__(self, registers, queue): - super(object, self).__init__() + super(MqttProcessor, self).__init__() self.registers = registers self.queue = queue @@ -73,19 +87,42 @@ class MqttProcessor(threading.Thread): pass # call onMessage method of register with related subscribe topic # put register yourself in high prio queue - # notify using event + # self.put(r) -class ScanRateProcessingQueueFeeder(threading.Thread): +class ScanRateConsideringQueueFeeder(threading.Thread): def __init__(self, registers, queue): - super(threading.Thread, self).__init__() + super(ScanRateConsideringQueueFeeder, self).__init__() self.registers = registers self.queue = queue def run(self): - pass - # search registers with expired scanRate (lastContact + scanRate * backoff < now) - # put into low prio queue + while True: + registersToBeHandled = [ + r for r in self.registers if ((not r.enqueued) and + (r.scanRate) and + ((not r.lastContact) or + (r.lastContact + r.scanRate < datetime.datetime.now()))) + ] + registersToBeHandled.sort(key=lambda x : x.scanRate) + for r in registersToBeHandled: + self.queue.put(r) + r.enqueued = True + + +class CommunicationProcessor(threading.Thread): + def __init__(self, queue): + super(CommunicationProcessor, self).__init__() + self.queue = queue + + def run(self): + while True: + r = self.queue.get() + # r.process() + r.lastContact = datetime.datetime.now() + print("Dequeued: {0!s}".format(r)) + r.enqueued = False + @@ -106,14 +143,24 @@ class ScanRateProcessingQueueFeeder(threading.Thread): datapoints = [ - HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, 60.0, 'Pub/Voltage', None, None), - HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, 60.0, 'Pub/Frequency', None, None), - HoldingRegisterDatapoint('Current', 1, 0x2060, 2, 60.0, 'Pub/Current', None, None), - HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, 1.0, 'Pub/ResistanceChannel1', None, None), - HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, 1.0, 'Pub/TemperatureChannel1', None, None), - HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, 1.0, 'Pub/ResistanceChannel2', None, None), - HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, 1.0, 'Pub/TemperatureChannel2', None, None), - HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, 0.0, None, 'Sub/Relay1', 'Feedback/Relay1') + HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), + HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, datetime.timedelta(seconds=10), 'Pub/Frequency', None, None), + HoldingRegisterDatapoint('Current', 1, 0x2060, 2, datetime.timedelta(seconds=10), 'Pub/Current', None, None), + HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel1', None, None), + HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel1', None, None), + HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel2', None, None), + HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel2', None, None), + HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1') ] +queue = queue.PriorityQueue() + + + +if __name__ == "__main__": + cp = CommunicationProcessor(queue) + cp.start() + + qf = ScanRateConsideringQueueFeeder(datapoints, queue) + qf.start()