diff --git a/snippets/test8.py b/snippets/test8.py index f9f7391..632a4e3 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -5,6 +5,24 @@ import socketserver import cmd import io + + +class AbstractNotificationReceiver(object): + def receiveNotification(self, arg): + raise NotImplementedError + +class NotificationForwarder(object): + def __init__(self): + self.receivers = [] + + def register(self, receiver): + self.receivers.append(receiver) + + def notify(self, arg=None): + for r in self.receivers: + r.receiveNotification(arg) + + class AbstractModbusDatapoint(object): def __init__(self, label, unit, address, count, scanRate): self.label = label @@ -19,8 +37,6 @@ class AbstractModbusDatapoint(object): else: self.priority = 0 - - def __lt__(self, other): return self.priority < other.priority def __le__(self, other): return self.priority <= other.priority def __eq__(self, other): return self.priority == other.priority @@ -31,6 +47,7 @@ class AbstractModbusDatapoint(object): def __str__(self): return "{0}, {1}: {2} {3} {4}".format(self.type, self.label, self.unit, self.address, self.count) + class HoldingRegisterDatapoint(AbstractModbusDatapoint): def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate) @@ -70,13 +87,14 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): self.writeRequestValue = value -class MqttProcessor(threading.Thread): +class MqttProcessor(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): super(MqttProcessor, self).__init__() self.registers = registers self.queue = queue - def registersChanged(self): + def receiveNotification(self, arg): + print("MqttProcessor:registersChanged") pass # subscribe and/or unsubscribe according to register changes @@ -97,7 +115,7 @@ class MqttProcessor(threading.Thread): # self.put(r) -class ScanRateConsideringQueueFeeder(threading.Thread): +class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): super(ScanRateConsideringQueueFeeder, self).__init__() self.registers = registers @@ -107,7 +125,8 @@ class ScanRateConsideringQueueFeeder(threading.Thread): def getMinimalScanrate(self): return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) - def registersChanged(self): + def receiveNotification(self, arg): + print("ScanRateConsideringQueueFeeder:registersChanged") self.delay = self.getMinimalScanrate() def run(self): @@ -140,7 +159,6 @@ class CommunicationProcessor(threading.Thread): r.enqueued = False - class CmdInterpreter(cmd.Cmd): def __init__(self, infile, outfile): super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile) @@ -149,6 +167,9 @@ class CmdInterpreter(cmd.Cmd): def do_test(self, arg): self.stdout.write("This is the test response") + def do_notify(self, arg): + nf.notify() + def do_bye(self, arg): self.stdout.write("Bye!") return True @@ -156,38 +177,25 @@ class CmdInterpreter(cmd.Cmd): class CmdHandle(socketserver.StreamRequestHandler): def handle(self): - print("About to handle cmd session") cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile)) - cmd.cmdloop() + try: + cmd.cmdloop() + except ConnectionAbortedError as e: + print("Cmd handle externally interrupted") - def finish(self): - super(CmdHandle, self).finish() - print("END") -class CmdServer(object): +class CmdServer(threading.Thread): def __init__(self, address, port): + super(CmdServer, self).__init__() self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle) - self.serverThread = threading.Thread(target=self.server.serve_forever()) def start(self): - self.serverThread.start() + self.server.serve_forever() - -# ModbusRequestDefinition(4, 0x2000, 2, 'F', '(ERR) Unavailable device'), -# ModbusRequestDefinition(1, 0x2000, 4, 'F', '(ERR) Wrong register size'), -# ModbusRequestDefinition(1, 0x2000, 2, 'F', 'Voltage'), -# ModbusRequestDefinition(1, 0x2020, 2, 'F', 'Frequency'), -# ModbusRequestDefinition(1, 0x2060, 2, 'F', 'Current'), -# ModbusRequestDefinition(3, 0x0004, 2, 'RF', 'Resistance Channel 1'), -# ModbusRequestDefinition(3, 0x000C, 2, 'RF', 'Temperature Channel 1'), -# ModbusRequestDefinition(3, 0x0014, 2, 'RF', 'Resistance Channel 2'), -# ModbusRequestDefinition(3, 0x001C, 2, 'RF', 'Temperature Channel 2'), - - datapoints = [ HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), HoldingRegisterDatapoint('Frequency', 1, 0x2020, 2, datetime.timedelta(seconds=10), 'Pub/Frequency', None, None), @@ -199,17 +207,21 @@ datapoints = [ HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1') ] - queue = queue.PriorityQueue() - +nf = NotificationForwarder() if __name__ == "__main__": - #cp = CommunicationProcessor(queue) - #cp.start() + cp = CommunicationProcessor(queue) + cp.start() - #qf = ScanRateConsideringQueueFeeder(datapoints, queue) - #qf.start() + mp = MqttProcessor(datapoints, queue) + nf.register(mp) + mp.start() - cs = CmdServer('0.0.0.0',9999) + qf = ScanRateConsideringQueueFeeder(datapoints, queue) + nf.register(qf) + qf.start() + + cs = CmdServer('127.0.0.1',9999) cs.start() \ No newline at end of file