priority handling fixed, list and del command in admin intf
This commit is contained in:
@ -48,13 +48,6 @@ class AbstractModbusDatapoint(object):
|
|||||||
else:
|
else:
|
||||||
self.priority = 0
|
self.priority = 0
|
||||||
|
|
||||||
def __lt__(self, other): return self.priority < other.priority
|
|
||||||
def __le__(self, other): return self.priority <= other.priority
|
|
||||||
def __eq__(self, other): return self.priority == other.priority
|
|
||||||
def __ne__(self, other): return self.priority != other.priority
|
|
||||||
def __gt__(self, other): return self.priority > other.priority
|
|
||||||
def __ge__(self, other): return self.priority >= other.priority
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value)
|
return "{0}, {1}: {2} {3} {4} {5} {6}".format(self.type, self.label, self.unit, self.address, self.count, self.command, self.value)
|
||||||
|
|
||||||
@ -67,7 +60,7 @@ class AbstractModbusDatapoint(object):
|
|||||||
|
|
||||||
class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
||||||
def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic):
|
def __init__(self, label, unit, address, count, scanRate, publishTopic, subscribeTopic, feedbackTopic):
|
||||||
super(HoldingRegisterDatapoint, self).__init__(label, unit, address, count, scanRate)
|
super().__init__(label, unit, address, count, scanRate)
|
||||||
self.publishTopic = publishTopic
|
self.publishTopic = publishTopic
|
||||||
self.subscribeTopic = subscribeTopic
|
self.subscribeTopic = subscribeTopic
|
||||||
self.feedbackTopic = feedbackTopic
|
self.feedbackTopic = feedbackTopic
|
||||||
@ -75,6 +68,9 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
|||||||
self.lastContact = None
|
self.lastContact = None
|
||||||
self.type = 'read holding register'
|
self.type = 'read holding register'
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "[{0!s}, {1} {2} {3}".format(super().__str__(), self.publishTopic, self.subscribeTopic, self.feedbackTopic)
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
successFull = False
|
successFull = False
|
||||||
giveUp = False
|
giveUp = False
|
||||||
@ -115,7 +111,7 @@ def mqttOnDisconnectCallback(client, userdata, rc):
|
|||||||
|
|
||||||
class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||||
def __init__(self, registers, queue):
|
def __init__(self, registers, queue):
|
||||||
super(MqttProcessor, self).__init__()
|
super().__init__()
|
||||||
self.registers = registers
|
self.registers = registers
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.client = mqtt.Client(userdata=self)
|
self.client = mqtt.Client(userdata=self)
|
||||||
@ -174,7 +170,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
|||||||
|
|
||||||
class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver):
|
class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver):
|
||||||
def __init__(self, registers, queue):
|
def __init__(self, registers, queue):
|
||||||
super(ScanRateConsideringQueueFeeder, self).__init__()
|
super().__init__()
|
||||||
self.registers = registers
|
self.registers = registers
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.delayEvent = threading.Event()
|
self.delayEvent = threading.Event()
|
||||||
@ -205,7 +201,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei
|
|||||||
|
|
||||||
class CommunicationProcessor(threading.Thread):
|
class CommunicationProcessor(threading.Thread):
|
||||||
def __init__(self, queue):
|
def __init__(self, queue):
|
||||||
super(CommunicationProcessor, self).__init__()
|
super().__init__()
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
@ -217,36 +213,99 @@ class CommunicationProcessor(threading.Thread):
|
|||||||
r.enqueued = False
|
r.enqueued = False
|
||||||
|
|
||||||
|
|
||||||
class CmdInterpreter(cmd.Cmd):
|
class MyPriorityQueueItem(object):
|
||||||
def __init__(self, infile, outfile):
|
def __init__(self, itemWithPriority):
|
||||||
super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile)
|
self.itemWithPriority = itemWithPriority
|
||||||
self.use_rawinput = False
|
|
||||||
|
|
||||||
def do_test(self, arg):
|
def __lt__(self, other): return self.itemWithPriority.priority < other.itemWithPriority.priority
|
||||||
self.stdout.write("This is the test response\n\r")
|
def __le__(self, other): return self.itemWithPriority.priority <= other.itemWithPriority.priority
|
||||||
|
def __eq__(self, other): return self.itemWithPriority.priority == other.itemWithPriority.priority
|
||||||
|
def __ne__(self, other): return self.itemWithPriority.priority != other.itemWithPriority.priority
|
||||||
|
def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority
|
||||||
|
def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority
|
||||||
|
|
||||||
|
class MyPriorityQueue(queue.PriorityQueue):
|
||||||
|
def _put(self, itemWithPriority):
|
||||||
|
i = MyPriorityQueueItem(itemWithPriority)
|
||||||
|
super()._put(i)
|
||||||
|
|
||||||
|
def _get(self):
|
||||||
|
i = super()._get()
|
||||||
|
return i.itemWithPriority
|
||||||
|
|
||||||
|
class CmdInterpreter(cmd.Cmd):
|
||||||
|
def __init__(self, infile, outfile, notifier, registers):
|
||||||
|
super().__init__(stdin=infile, stdout=outfile)
|
||||||
|
self.use_rawinput = False
|
||||||
|
self.notifier = notifier
|
||||||
|
self.registers = registers
|
||||||
|
self.prompt = "test8> "
|
||||||
|
self.intro = "test8 admin interface"
|
||||||
|
|
||||||
|
def __print(self, text):
|
||||||
|
self.stdout.write(text)
|
||||||
|
|
||||||
|
def __println(self, text):
|
||||||
|
self.stdout.write(text)
|
||||||
|
self.stdout.write("\n\r")
|
||||||
|
|
||||||
def do_notify(self, arg):
|
def do_notify(self, arg):
|
||||||
nf.notify()
|
self.notifier.notify()
|
||||||
|
|
||||||
def do_bye(self, arg):
|
def help_notify(self):
|
||||||
self.stdout.write("Bye!\n\r")
|
self.__println("Notifies threads using the list of datapoints about changes in this list.")
|
||||||
|
self.__println("Call after modifications on the list.")
|
||||||
|
|
||||||
|
def do_quit(self, arg):
|
||||||
|
self.__println("Bye!")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def do_list(self, arg):
|
||||||
|
for i, r in enumerate(self.registers):
|
||||||
|
self.__println("#{0}: {1!s}".format(i, r))
|
||||||
|
|
||||||
|
def help_list(self):
|
||||||
|
self.__println("Usage: list")
|
||||||
|
self.__println("List the configured datapoints")
|
||||||
|
|
||||||
|
def do_del(self, arg):
|
||||||
|
try:
|
||||||
|
i = int(arg)
|
||||||
|
r = self.registers[i]
|
||||||
|
self.registers.remove(r)
|
||||||
|
self.__println("{0!s} removed".format(r))
|
||||||
|
except ValueError as e:
|
||||||
|
self.__println("ERROR: {0!s}".format(e))
|
||||||
|
|
||||||
|
def help_del(self):
|
||||||
|
self.__println("Usage: del <idx>")
|
||||||
|
self.__println("Removes an item from the list of datapoints by its index, see list command.")
|
||||||
|
self.__println("Be aware: indexes have been changed, rerun list before removing the next item.")
|
||||||
|
|
||||||
|
|
||||||
class CmdHandle(socketserver.StreamRequestHandler):
|
class CmdHandle(socketserver.StreamRequestHandler):
|
||||||
def handle(self):
|
def handle(self):
|
||||||
cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile))
|
cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile), self.server.userData.notifier, self.server.userData.registers)
|
||||||
try:
|
try:
|
||||||
cmd.cmdloop()
|
cmd.cmdloop()
|
||||||
print("Cmd handle terminated")
|
print("Cmd handle terminated")
|
||||||
except ConnectionAbortedError as e:
|
except ConnectionAbortedError as e:
|
||||||
print("Cmd handle externally interrupted")
|
print("Cmd handle externally interrupted")
|
||||||
|
|
||||||
|
class MyThreadingTCPServer(socketserver.ThreadingTCPServer):
|
||||||
|
def __init__(self, host, handler, userData):
|
||||||
|
super().__init__(host, handler)
|
||||||
|
self.userData = userData
|
||||||
|
|
||||||
|
class MyCmdUserData(object):
|
||||||
|
def __init__(self, notifier, registers):
|
||||||
|
self.notifier = notifier
|
||||||
|
self.registers = registers
|
||||||
|
|
||||||
class CmdServer(threading.Thread):
|
class CmdServer(threading.Thread):
|
||||||
def __init__(self, address, port):
|
def __init__(self, address, port, notifier, registers):
|
||||||
super(CmdServer, self).__init__()
|
super().__init__()
|
||||||
self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle)
|
self.server = MyThreadingTCPServer((address, port), CmdHandle, MyCmdUserData(notifier, registers))
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.server.serve_forever()
|
self.server.serve_forever()
|
||||||
@ -266,7 +325,7 @@ datapoints = [
|
|||||||
HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1')
|
HoldingRegisterDatapoint('Relay1', 5, 0x0001, 1, None, None, 'Sub/Relay1', 'Feedback/Relay1')
|
||||||
]
|
]
|
||||||
|
|
||||||
queue = queue.PriorityQueue()
|
queue = MyPriorityQueue()
|
||||||
nf = NotificationForwarder()
|
nf = NotificationForwarder()
|
||||||
config = Config()
|
config = Config()
|
||||||
|
|
||||||
@ -282,5 +341,5 @@ if __name__ == "__main__":
|
|||||||
nf.register(qf)
|
nf.register(qf)
|
||||||
qf.start()
|
qf.start()
|
||||||
|
|
||||||
cs = CmdServer('127.0.0.1',9999)
|
cs = CmdServer('127.0.0.1',9999, nf, datapoints)
|
||||||
cs.start()
|
cs.start()
|
Reference in New Issue
Block a user