diff --git a/snippets/test8.py b/snippets/test8.py index a77e984..c8e02a3 100644 --- a/snippets/test8.py +++ b/snippets/test8.py @@ -48,13 +48,6 @@ class AbstractModbusDatapoint(object): else: self.priority = 0 - def __lt__(self, other): return self.priority < other.priority - def __le__(self, other): return self.priority <= other.priority - def __eq__(self, other): return self.priority == other.priority - def __ne__(self, other): return self.priority != other.priority - def __gt__(self, other): return self.priority > other.priority - def __ge__(self, other): return self.priority >= other.priority - def __str__(self): return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value) @@ -67,7 +60,7 @@ class AbstractModbusDatapoint(object): class HoldingRegisterDatapoint(AbstractModbusDatapoint): def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic): - super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate) + super().__init__(label, unit, address, count, scanRate) self.publishTopic = publishTopic self.subscribeTopic = subscribeTopic self.feedbackTopic = feedbackTopic @@ -75,6 +68,9 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): self.lastContact = None self.type = 'read holding register' + def __str__(self): + return "[{0!s}, {1} {2} {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic) + def process(self): successFull = False giveUp = False @@ -115,7 +111,7 @@ def mqttOnDisconnectCallback(client, userdata, rc): class MqttProcessor(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): - super(MqttProcessor, self).__init__() + super().__init__() self.registers = registers self.queue = queue self.client = mqtt.Client(userdata=self) @@ -174,7 +170,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): def __init__(self, registers, queue): - super(ScanRateConsideringQueueFeeder, self).__init__() + super().__init__() self.registers = registers self.queue = queue self.delayEvent = threading.Event() @@ -205,7 +201,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei class CommunicationProcessor(threading.Thread): def __init__(self, queue): - super(CommunicationProcessor, self).__init__() + super().__init__() self.queue = queue def run(self): @@ -217,36 +213,99 @@ class CommunicationProcessor(threading.Thread): r.enqueued = False -class CmdInterpreter(cmd.Cmd): - def __init__(self, infile, outfile): - super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile) - self.use_rawinput = False +class MyPriorityQueueItem(object): + def __init__(self, itemWithPriority): + self.itemWithPriority = itemWithPriority - def do_test(self, arg): - self.stdout.write("This is the test response\n\r") + def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority + def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority + def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority + def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority + def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority + def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority + +class MyPriorityQueue(queue.PriorityQueue): + def _put(self, itemWithPriority): + i = MyPriorityQueueItem(itemWithPriority) + super()._put(i) + + def _get(self): + i = super()._get() + return i.itemWithPriority + +class CmdInterpreter(cmd.Cmd): + def __init__(self, infile, outfile, notifier, registers): + super().__init__(stdin=infile, stdout=outfile) + self.use_rawinput = False + self.notifier = notifier + self.registers = registers + self.prompt = "test8> " + self.intro = "test8 admin interface" + + def __print(self, text): + self.stdout.write(text) + + def __println(self, text): + self.stdout.write(text) + self.stdout.write("\n\r") def do_notify(self, arg): - nf.notify() + self.notifier.notify() - def do_bye(self, arg): - self.stdout.write("Bye!\n\r") + def help_notify(self): + self.__println("Notifies threads using the list of datapoints about changes in this list.") + self.__println("Call after modifications on the list.") + + def do_quit(self, arg): + self.__println("Bye!") return True + + def do_list(self, arg): + for i, r in enumerate(self.registers): + self.__println("#{0}: {1!s}".format(i, r)) + + def help_list(self): + self.__println("Usage: list") + self.__println("List the configured datapoints") + + def do_del(self, arg): + try: + i = int(arg) + r = self.registers[i] + self.registers.remove(r) + self.__println("{0!s} removed".format(r)) + except ValueError as e: + self.__println("ERROR: {0!s}".format(e)) + + def help_del(self): + self.__println("Usage: del ") + self.__println("Removes an item from the list of datapoints by its index, see list command.") + self.__println("Be aware: indexes have been changed, rerun list before removing the next item.") class CmdHandle(socketserver.StreamRequestHandler): def handle(self): - cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile)) + cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile), self.server.userData.notifier, self.server.userData.registers) try: cmd.cmdloop() print("Cmd handle terminated") except ConnectionAbortedError as e: print("Cmd handle externally interrupted") +class MyThreadingTCPServer(socketserver.ThreadingTCPServer): + def __init__(self, host, handler, userData): + super().__init__(host, handler) + self.userData = userData + +class MyCmdUserData(object): + def __init__(self, notifier, registers): + self.notifier = notifier + self.registers = registers class CmdServer(threading.Thread): - def __init__(self, address, port): - super(CmdServer, self).__init__() - self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle) + def __init__(self, address, port, notifier, registers): + super().__init__() + self.server = MyThreadingTCPServer((address, port), CmdHandle, MyCmdUserData(notifier, registers)) def start(self): self.server.serve_forever() @@ -266,7 +325,7 @@ datapoints = [ HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1') ] -queue = queue.PriorityQueue() +queue = MyPriorityQueue() nf = NotificationForwarder() config = Config() @@ -282,5 +341,5 @@ if __name__ == "__main__": nf.register(qf) qf.start() - cs = CmdServer('127.0.0.1',9999) + cs = CmdServer('127.0.0.1',9999, nf, datapoints) cs.start() \ No newline at end of file