228 lines
7.5 KiB
Python
Raw Normal View History

2019-06-21 18:57:00 +02:00
import queue
import datetime
import threading
2019-06-24 16:03:57 +02:00
import socketserver
import cmd
import io
2019-06-21 18:57:00 +02:00
2019-06-24 17:01:10 +02:00
class AbstractNotificationReceiver(object):
def receiveNotification(self, arg):
raise NotImplementedError
class NotificationForwarder(object):
def __init__(self):
self.receivers = []
def register(self, receiver):
self.receivers.append(receiver)
def notify(self, arg=None):
for r in self.receivers:
r.receiveNotification(arg)
2019-06-21 18:57:00 +02:00
class AbstractModbusDatapoint(object):
2019-06-22 00:45:28 +02:00
def __init__(self, label, unit, address, count, scanRate):
2019-06-21 18:57:00 +02:00
self.label = label
self.unit = unit
self.address = address
self.count = count
2019-06-22 00:45:28 +02:00
self.scanRate = scanRate
2019-06-21 18:57:00 +02:00
self.type = 'abstract data point'
2019-06-22 00:45:28 +02:00
self.enqueued = False
if self.scanRate:
self.priority = 1
else:
self.priority = 0
def __lt__(self, other): return self.priority < other.priority
def __le__(self, other): return self.priority <= other.priority
def __eq__(self, other): return self.priority == other.priority
def __ne__(self, other): return self.priority != other.priority
def __gt__(self, other): return self.priority > other.priority
def __ge__(self, other): return self.priority >= other.priority
2019-06-21 18:57:00 +02:00
def __str__(self):
return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count)
2019-06-24 17:01:10 +02:00
2019-06-21 18:57:00 +02:00
class HoldingRegisterDatapoint(AbstractModbusDatapoint):
def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic):
2019-06-22 00:45:28 +02:00
super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate)
2019-06-21 18:57:00 +02:00
self.publishTopic = publishTopic
self.subscribeTopic = subscribeTopic
self.feedbackTopic = feedbackTopic
self.writeRequestValue = None
2019-06-22 00:45:28 +02:00
self.lastContact = None
2019-06-21 18:57:00 +02:00
self.type = 'read holding register'
def process(self):
successFull = False
giveUp = False
if self.writeRequestValue:
# perform write operation
if successFull:
# give feedback
self.writeRequestValue = None
else:
# retries handling
if giveUp:
# give negative feedback
self.writeRequestValue = None
else:
# perform read operation
if successFull:
self.lastContact = datetime.datetime.now()
# publish value
else:
# retries handling
if giveUp:
# backoff and availability handling
# give negative feedback
pass
def onMessage(self, value):
self.writeRequestValue = value
2019-06-24 17:01:10 +02:00
class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
2019-06-21 18:57:00 +02:00
def __init__(self, registers, queue):
2019-06-22 00:45:28 +02:00
super(MqttProcessor, self).__init__()
2019-06-21 18:57:00 +02:00
self.registers = registers
self.queue = queue
2019-06-24 17:01:10 +02:00
def receiveNotification(self, arg):
print("MqttProcessor:registersChanged")
2019-06-24 16:03:57 +02:00
pass
# subscribe and/or unsubscribe according to register changes
2019-06-21 18:57:00 +02:00
def run(self):
pass
# set mqtt callbacks
# mqtt connect
# mqtt loop forever
def onConnect(self):
pass
# subscribe to all subscribe topics from registers
def onMessage(self, topic, payload):
pass
# call onMessage method of register with related subscribe topic
# put register yourself in high prio queue
2019-06-22 00:45:28 +02:00
# self.put(r)
2019-06-21 18:57:00 +02:00
2019-06-24 17:01:10 +02:00
class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver):
2019-06-21 18:57:00 +02:00
def __init__(self, registers, queue):
2019-06-22 00:45:28 +02:00
super(ScanRateConsideringQueueFeeder, self).__init__()
2019-06-21 18:57:00 +02:00
self.registers = registers
self.queue = queue
self.delayEvent = threading.Event()
2019-06-21 18:57:00 +02:00
2019-06-24 16:03:57 +02:00
def getMinimalScanrate(self):
return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])
2019-06-24 17:01:10 +02:00
def receiveNotification(self, arg):
print("ScanRateConsideringQueueFeeder:registersChanged")
2019-06-24 16:03:57 +02:00
self.delay = self.getMinimalScanrate()
2019-06-21 18:57:00 +02:00
def run(self):
2019-06-24 16:03:57 +02:00
self.delay = self.getMinimalScanrate()
2019-06-22 00:45:28 +02:00
while True:
registersToBeHandled = [
r for r in self.registers if ((not r.enqueued) and
(r.scanRate) and
((not r.lastContact) or
(r.lastContact + r.scanRate < datetime.datetime.now())))
]
registersToBeHandled.sort(key=lambda x : x.scanRate)
for r in registersToBeHandled:
self.queue.put(r)
r.enqueued = True
2019-06-24 16:03:57 +02:00
self.delayEvent.wait(self.delay)
2019-06-22 00:45:28 +02:00
class CommunicationProcessor(threading.Thread):
def __init__(self, queue):
super(CommunicationProcessor, self).__init__()
self.queue = queue
def run(self):
while True:
r = self.queue.get()
# r.process()
r.lastContact = datetime.datetime.now()
print("Dequeued: {0!s}".format(r))
r.enqueued = False
2019-06-21 18:57:00 +02:00
2019-06-24 16:03:57 +02:00
class CmdInterpreter(cmd.Cmd):
def __init__(self, infile, outfile):
super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile)
self.use_rawinput = False
def do_test(self, arg):
2019-06-24 17:15:47 +02:00
self.stdout.write("This is the test response\n\r")
2019-06-24 16:03:57 +02:00
2019-06-24 17:01:10 +02:00
def do_notify(self, arg):
nf.notify()
2019-06-24 16:03:57 +02:00
def do_bye(self, arg):
2019-06-24 17:15:47 +02:00
self.stdout.write("Bye!\n\r")
2019-06-24 16:03:57 +02:00
return True
class CmdHandle(socketserver.StreamRequestHandler):
def handle(self):
cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile))
2019-06-24 17:01:10 +02:00
try:
cmd.cmdloop()
2019-06-24 17:15:47 +02:00
print("Cmd handle terminated")
2019-06-24 17:01:10 +02:00
except ConnectionAbortedError as e:
print("Cmd handle externally interrupted")
2019-06-24 16:03:57 +02:00
2019-06-24 17:01:10 +02:00
class CmdServer(threading.Thread):
2019-06-24 16:03:57 +02:00
def __init__(self, address, port):
2019-06-24 17:01:10 +02:00
super(CmdServer, self).__init__()
2019-06-24 16:03:57 +02:00
self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle)
def start(self):
2019-06-24 17:01:10 +02:00
self.server.serve_forever()
2019-06-21 18:57:00 +02:00
datapoints = [
2019-06-22 00:45:28 +02:00
HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None),
HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, datetime.timedelta(seconds=10), 'Pub/Frequency', None, None),
HoldingRegisterDatapoint('Current', 1, 0x2060, 2, datetime.timedelta(seconds=10), 'Pub/Current', None, None),
HoldingRegisterDatapoint('Resistance Channel 1', 2, 0x0004, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel1', None, None),
HoldingRegisterDatapoint('Temperature Channel 1', 2, 0x000c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel1', None, None),
HoldingRegisterDatapoint('Resistance Channel 2', 2, 0x0014, 2, datetime.timedelta(seconds=1), 'Pub/ResistanceChannel2', None, None),
HoldingRegisterDatapoint('Temperature Channel 2', 2, 0x001c, 2, datetime.timedelta(seconds=1), 'Pub/TemperatureChannel2', None, None),
HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1')
2019-06-21 18:57:00 +02:00
]
2019-06-22 00:45:28 +02:00
queue = queue.PriorityQueue()
2019-06-24 17:01:10 +02:00
nf = NotificationForwarder()
2019-06-22 00:45:28 +02:00
if __name__ == "__main__":
2019-06-24 17:01:10 +02:00
cp = CommunicationProcessor(queue)
cp.start()
mp = MqttProcessor(datapoints, queue)
nf.register(mp)
mp.start()
2019-06-24 16:03:57 +02:00
2019-06-24 17:01:10 +02:00
qf = ScanRateConsideringQueueFeeder(datapoints, queue)
nf.register(qf)
qf.start()
2019-06-22 00:45:28 +02:00
2019-06-24 17:01:10 +02:00
cs = CmdServer('127.0.0.1',9999)
2019-06-24 16:03:57 +02:00
cs.start()