priority handling tested
This commit is contained in:
@ -3,25 +3,39 @@ import datetime
|
|||||||
import threading
|
import threading
|
||||||
|
|
||||||
class AbstractModbusDatapoint(object):
|
class AbstractModbusDatapoint(object):
|
||||||
def __init__(self, label, unit, address, count):
|
def __init__(self, label, unit, address, count, scanRate):
|
||||||
self.label = label
|
self.label = label
|
||||||
self.unit = unit
|
self.unit = unit
|
||||||
self.address = address
|
self.address = address
|
||||||
self.count = count
|
self.count = count
|
||||||
|
self.scanRate = scanRate
|
||||||
self.type = 'abstract data point'
|
self.type = 'abstract data point'
|
||||||
|
self.enqueued = False
|
||||||
|
if self.scanRate:
|
||||||
|
self.priority = 1
|
||||||
|
else:
|
||||||
|
self.priority = 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def __lt__(self, other): return self.priority < other.priority
|
||||||
|
def __le__(self, other): return self.priority <= other.priority
|
||||||
|
def __eq__(self, other): return self.priority == other.priority
|
||||||
|
def __ne__(self, other): return self.priority != other.priority
|
||||||
|
def __gt__(self, other): return self.priority > other.priority
|
||||||
|
def __ge__(self, other): return self.priority >= other.priority
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count)
|
return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count)
|
||||||
|
|
||||||
class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
||||||
def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic):
|
def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic):
|
||||||
super(AbstractModbusDatapoint, self).__init__(label, unit, address, count)
|
super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate)
|
||||||
self.scanRate = scanRate
|
|
||||||
self.publishTopic = publishTopic
|
self.publishTopic = publishTopic
|
||||||
self.subscribeTopic = subscribeTopic
|
self.subscribeTopic = subscribeTopic
|
||||||
self.feedbackTopic = feedbackTopic
|
self.feedbackTopic = feedbackTopic
|
||||||
self.writeRequestValue = None
|
self.writeRequestValue = None
|
||||||
self.lastContact = 0
|
self.lastContact = None
|
||||||
self.type = 'read holding register'
|
self.type = 'read holding register'
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
@ -55,7 +69,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
|||||||
|
|
||||||
class MqttProcessor(threading.Thread):
|
class MqttProcessor(threading.Thread):
|
||||||
def __init__(self, registers, queue):
|
def __init__(self, registers, queue):
|
||||||
super(object, self).__init__()
|
super(MqttProcessor, self).__init__()
|
||||||
self.registers = registers
|
self.registers = registers
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
||||||
@ -73,19 +87,42 @@ class MqttProcessor(threading.Thread):
|
|||||||
pass
|
pass
|
||||||
# call onMessage method of register with related subscribe topic
|
# call onMessage method of register with related subscribe topic
|
||||||
# put register yourself in high prio queue
|
# put register yourself in high prio queue
|
||||||
# notify using event
|
# self.put(r)
|
||||||
|
|
||||||
|
|
||||||
class ScanRateProcessingQueueFeeder(threading.Thread):
|
class ScanRateConsideringQueueFeeder(threading.Thread):
|
||||||
def __init__(self, registers, queue):
|
def __init__(self, registers, queue):
|
||||||
super(threading.Thread, self).__init__()
|
super(ScanRateConsideringQueueFeeder, self).__init__()
|
||||||
self.registers = registers
|
self.registers = registers
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
pass
|
while True:
|
||||||
# search registers with expired scanRate (lastContact + scanRate * backoff < now)
|
registersToBeHandled = [
|
||||||
# put into low prio queue
|
r for r in self.registers if ((not r.enqueued) and
|
||||||
|
(r.scanRate) and
|
||||||
|
((not r.lastContact) or
|
||||||
|
(r.lastContact + r.scanRate < datetime.datetime.now())))
|
||||||
|
]
|
||||||
|
registersToBeHandled.sort(key=lambda x : x.scanRate)
|
||||||
|
for r in registersToBeHandled:
|
||||||
|
self.queue.put(r)
|
||||||
|
r.enqueued = True
|
||||||
|
|
||||||
|
|
||||||
|
class CommunicationProcessor(threading.Thread):
|
||||||
|
def __init__(self, queue):
|
||||||
|
super(CommunicationProcessor, self).__init__()
|
||||||
|
self.queue = queue
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
r = self.queue.get()
|
||||||
|
# r.process()
|
||||||
|
r.lastContact = datetime.datetime.now()
|
||||||
|
print("Dequeued: {0!s}".format(r))
|
||||||
|
r.enqueued = False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -106,14 +143,24 @@ class ScanRateProcessingQueueFeeder(threading.Thread):
|
|||||||
|
|
||||||
|
|
||||||
datapoints = [
|
datapoints = [
|
||||||
HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, 60.0, 'Pub/Voltage', None, None),
|
HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None),
|
||||||
HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, 60.0, 'Pub/Frequency', None, None),
|
HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, datetime.timedelta(seconds=10), 'Pub/Frequency', None, None),
|
||||||
HoldingRegisterDatapoint('Current', 1, 0x2060, 2, 60.0, 'Pub/Current', None, None),
|
HoldingRegisterDatapoint('Current', 1, 0x2060, 2, datetime.timedelta(seconds=10), 'Pub/Current', None, None),
|
||||||
HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, 1.0, 'Pub/ResistanceChannel1', None, None),
|
HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel1', None, None),
|
||||||
HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, 1.0, 'Pub/TemperatureChannel1', None, None),
|
HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel1', None, None),
|
||||||
HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, 1.0, 'Pub/ResistanceChannel2', None, None),
|
HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel2', None, None),
|
||||||
HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, 1.0, 'Pub/TemperatureChannel2', None, None),
|
HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel2', None, None),
|
||||||
HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, 0.0, None, 'Sub/Relay1', 'Feedback/Relay1')
|
HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
queue = queue.PriorityQueue()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cp = CommunicationProcessor(queue)
|
||||||
|
cp.start()
|
||||||
|
|
||||||
|
qf = ScanRateConsideringQueueFeeder(datapoints, queue)
|
||||||
|
qf.start()
|
||||||
|
Reference in New Issue
Block a user