cmd handler
This commit is contained in:
@ -1,6 +1,9 @@
|
|||||||
import queue
|
import queue
|
||||||
import datetime
|
import datetime
|
||||||
import threading
|
import threading
|
||||||
|
import socketserver
|
||||||
|
import cmd
|
||||||
|
import io
|
||||||
|
|
||||||
class AbstractModbusDatapoint(object):
|
class AbstractModbusDatapoint(object):
|
||||||
def __init__(self, label, unit, address, count, scanRate):
|
def __init__(self, label, unit, address, count, scanRate):
|
||||||
@ -73,6 +76,10 @@ class MqttProcessor(threading.Thread):
|
|||||||
self.registers = registers
|
self.registers = registers
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
||||||
|
def registersChanged(self):
|
||||||
|
pass
|
||||||
|
# subscribe and/or unsubscribe according to register changes
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
pass
|
pass
|
||||||
# set mqtt callbacks
|
# set mqtt callbacks
|
||||||
@ -97,7 +104,14 @@ class ScanRateConsideringQueueFeeder(threading.Thread):
|
|||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.delayEvent = threading.Event()
|
self.delayEvent = threading.Event()
|
||||||
|
|
||||||
|
def getMinimalScanrate(self):
|
||||||
|
return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])
|
||||||
|
|
||||||
|
def registersChanged(self):
|
||||||
|
self.delay = self.getMinimalScanrate()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
self.delay = self.getMinimalScanrate()
|
||||||
while True:
|
while True:
|
||||||
registersToBeHandled = [
|
registersToBeHandled = [
|
||||||
r for r in self.registers if ((not r.enqueued) and
|
r for r in self.registers if ((not r.enqueued) and
|
||||||
@ -109,7 +123,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread):
|
|||||||
for r in registersToBeHandled:
|
for r in registersToBeHandled:
|
||||||
self.queue.put(r)
|
self.queue.put(r)
|
||||||
r.enqueued = True
|
r.enqueued = True
|
||||||
self.delayEvent.wait(min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]))
|
self.delayEvent.wait(self.delay)
|
||||||
|
|
||||||
|
|
||||||
class CommunicationProcessor(threading.Thread):
|
class CommunicationProcessor(threading.Thread):
|
||||||
@ -127,6 +141,36 @@ class CommunicationProcessor(threading.Thread):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class CmdInterpreter(cmd.Cmd):
|
||||||
|
def __init__(self, infile, outfile):
|
||||||
|
super(CmdInterpreter, self).__init__(stdin=infile, stdout=outfile)
|
||||||
|
self.use_rawinput = False
|
||||||
|
|
||||||
|
def do_test(self, arg):
|
||||||
|
self.stdout.write("This is the test response")
|
||||||
|
|
||||||
|
def do_bye(self, arg):
|
||||||
|
self.stdout.write("Bye!")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class CmdHandle(socketserver.StreamRequestHandler):
|
||||||
|
def handle(self):
|
||||||
|
print("About to handle cmd session")
|
||||||
|
cmd = CmdInterpreter(io.TextIOWrapper(self.rfile), io.TextIOWrapper(self.wfile))
|
||||||
|
cmd.cmdloop()
|
||||||
|
|
||||||
|
def finish(self):
|
||||||
|
super(CmdHandle, self).finish()
|
||||||
|
print("END")
|
||||||
|
|
||||||
|
class CmdServer(object):
|
||||||
|
def __init__(self, address, port):
|
||||||
|
self.server = socketserver.ThreadingTCPServer((address, port), CmdHandle)
|
||||||
|
self.serverThread = threading.Thread(target=self.server.serve_forever())
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.serverThread.start()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -161,8 +205,11 @@ queue = queue.PriorityQueue()
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cp = CommunicationProcessor(queue)
|
#cp = CommunicationProcessor(queue)
|
||||||
cp.start()
|
#cp.start()
|
||||||
|
|
||||||
qf = ScanRateConsideringQueueFeeder(datapoints, queue)
|
#qf = ScanRateConsideringQueueFeeder(datapoints, queue)
|
||||||
qf.start()
|
#qf.start()
|
||||||
|
|
||||||
|
cs = CmdServer('0.0.0.0',9999)
|
||||||
|
cs.start()
|
Reference in New Issue
Block a user