import threading import socketserver import cmd import re import io import datetime import RegisterDatapoint import logging import Converters class CmdInterpreterException(ValueError): pass def parseIntArbitraryBase(s): i = 0 if s.startswith('0x'): i = int(s, 16) elif s.startswith('0b'): i = int(s, 2) else: i = int(s, 10) return i class CmdInterpreter(cmd.Cmd): def __init__(self, infile, outfile, config, notifier, registers): super().__init__(stdin=infile, stdout=outfile) self.use_rawinput = False self.config = config self.notifier = notifier self.registers = registers self.prompt = "test8> " self.intro = "test8 admin interface" self.splitterRe = re.compile(r'\s+') self.logger = logging.getLogger('CmdInterpreter') def onecmd(self, line): try: return super().onecmd(line) except Exception as e: msg = 'Caught exception in cmd "{0}": {1!s}'.format(line, e) self.__println(msg) self.logger.error(msg) def __print(self, text): self.stdout.write(text) def __println(self, text): self.stdout.write(text) self.stdout.write("\n\r") def __listConverterNames(self): return [name for name in Converters.Converters] def do_add_hr(self, arg): try: (label, unit, address, count, scanrate, readTopic, writeTopic, feedbackTopic, converter) = self.splitterRe.split(arg) self.__println("Label: {0}".format(label)) self.__println("Unit: {0}".format(unit)) self.__println("Address: {0}".format(address)) self.__println("Count: {0}".format(count)) self.__println("ScanRate: {0}".format(scanrate)) self.__println("ReadTopic: {0}".format(readTopic)) self.__println("WriteTopic: {0}".format(writeTopic)) self.__println("FeedbackTopic: {0}".format(feedbackTopic)) self.__println("Converter: {0}".format(converter)) if readTopic == 'None': readTopic = None if writeTopic == 'None': writeTopic = None if feedbackTopic == 'None': feedbackTopic = None if converter == 'None': converter = None unit = parseIntArbitraryBase(unit) address = parseIntArbitraryBase(address) count = parseIntArbitraryBase(count) scanrate = float(scanrate) r = RegisterDatapoint.HoldingRegisterDatapoint(label=label, unit=unit, address=address, count=count, scanRate=datetime.timedelta(seconds=scanrate), publishTopic=readTopic, subscribe=writeTopic, feedbackTopic=feedbackTopic, converter=converter) self.registers.append(r) except ValueError as e: self.__println("ERROR: {0!s}, {1!s}".format(e.__class__.__name__, e)) def help_add_hr(self): # HoldingRegisterDatapoint('Voltage', 1, 0x2000, 2, datetime.timedelta(seconds=10), 'Pub/Voltage', None, None), self.__println("Usage: add_hr