From 94e60ee172892763e6c03d0e4b2a0ad7394faf10 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Tue, 10 Sep 2019 16:33:18 +0200 Subject: [PATCH] fix style issues --- src/CommunicationProcessor.py | 6 +-- src/Converters.py | 13 +++---- src/Heartbeat.py | 1 + src/MqttProcessor.py | 15 +++++--- src/MyPriorityQueue.py | 1 + src/MyRS485.py | 2 +- src/NotificationForwarder.py | 1 + src/Pins.py | 2 - src/RegisterDatapoint.py | 55 ++++++++++++++------------- src/ScanRateConsideringQueueFeeder.py | 7 ++-- src/initialRegisterFile.py | 10 +++-- src/loadRegisterFile.py | 2 +- src/master.py | 3 +- src/updateRegisterFile.py | 7 +--- 14 files changed, 64 insertions(+), 61 deletions(-) diff --git a/src/CommunicationProcessor.py b/src/CommunicationProcessor.py index c5c74a0..9676fd3 100644 --- a/src/CommunicationProcessor.py +++ b/src/CommunicationProcessor.py @@ -12,6 +12,7 @@ import logging ERROR_PIN = 29 + class CommunicationProcessor(threading.Thread): def __init__(self, config, queue, pubQueue): super().__init__() @@ -33,7 +34,6 @@ class CommunicationProcessor(threading.Thread): return MyRS485.MyRS485(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1, timeout=1) - def run(self): client = ModbusSerialClient(method='rtu') client.socket = self.__getSerial() @@ -56,7 +56,3 @@ class CommunicationProcessor(threading.Thread): client.socket = self.__getSerial() finally: time.sleep(self.config.interCommDelay) - - - - diff --git a/src/Converters.py b/src/Converters.py index 07f94fa..8accab9 100644 --- a/src/Converters.py +++ b/src/Converters.py @@ -1,6 +1,6 @@ -# in: from Modbus to MQTT, input is a list of 16bit integers, output shall be the desired format +# in: from Modbus to MQTT, input is a list of 16bit integers, output shall be the desired format # to be sent in the MQTT message -# out: from MQTT to Modbus, input is the format received from MQTT, output shall be a list of +# out: from MQTT to Modbus, input is the format received from MQTT, output shall be a list of # 16bit integers to be written to the Modbus slave from struct import pack, unpack @@ -12,20 +12,19 @@ def fix1twos(x): if x & 0x8000: r = ((x - 1) ^ 0xffff) * -1 return r / 10 - + Converters = { "dht20TOFloat": { - "in": lambda x : float(x[0]) / 10.0, + "in": lambda x: float(x[0]) / 10.0, "out": None }, "uint32": { - "in": lambda x : unpack('L', pack('HH', *x))[0], - "out": lambda x : unpack('HH', pack('L', int(x))) + "in": lambda x: unpack('L', pack('HH', *x))[0], + "out": lambda x: unpack('HH', pack('L', int(x))) }, "fix1twos": { "in": lambda x: fix1twos(x), "out": None } } - diff --git a/src/Heartbeat.py b/src/Heartbeat.py index 150316a..9eacb6e 100644 --- a/src/Heartbeat.py +++ b/src/Heartbeat.py @@ -3,6 +3,7 @@ import MqttProcessor import logging import time + class Heartbeat(threading.Thread): def __init__(self, config, pubQueue): super().__init__() diff --git a/src/MqttProcessor.py b/src/MqttProcessor.py index 0e81de5..3c07f30 100644 --- a/src/MqttProcessor.py +++ b/src/MqttProcessor.py @@ -4,6 +4,7 @@ from NotificationForwarder import AbstractNotificationReceiver import logging import Pins + class PublishItem(object): def __init__(self, topic, payload): self.topic = topic @@ -12,15 +13,19 @@ class PublishItem(object): def __str__(self): return 'Topic: {0}, Payload: {1}'.format(self.topic, self.payload) + def mqttOnConnectCallback(client, userdata, flags, rc): userdata.onConnect() + def mqttOnMessageCallback(client, userdata, message): userdata.onMessage(message.topic, message.payload) + def mqttOnDisconnectCallback(client, userdata, rc): userdata.onDisconnect(rc) + class MqttProcessor(threading.Thread, AbstractNotificationReceiver): def __init__(self, config, registers, queue, pubQueue): super().__init__() @@ -30,14 +35,15 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): self.pubQueue = pubQueue self.client = mqtt.Client(userdata=self) self.subscriptions = [] - self.topicRegisterMap ={} + self.topicRegisterMap = {} # self.daemon = True self.logger = logging.getLogger('MqttProcessor') def __processUpdatedRegisters(self, force=False): self.logger.debug("MqttProcessor.__updateSubscriptions") - subscribeTopics = [ r.subscribeTopic for r in self.registers if hasattr(r,'subscribeTopic') and r.subscribeTopic] + subscribeTopics = [r.subscribeTopic for r in self.registers if hasattr(r, 'subscribeTopic') + and r.subscribeTopic] self.logger.debug("Topics: {0!s}".format(subscribeTopics)) for subscribeTopic in subscribeTopics: @@ -52,7 +58,8 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): self.client.unsubscribe(subscription) self.subscriptions.remove(subscription) - self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if hasattr(r,'subscribeTopic') and r.subscribeTopic } + self.topicRegisterMap = {r.subscribeTopic: r for r in self.registers if hasattr(r, 'subscribeTopic') + and r.subscribeTopic} def receiveNotification(self, arg): self.logger.info("MqttProcessor:registersChanged") @@ -76,7 +83,6 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): else: self.logger.error("Invalid object in publish queue") - def onConnect(self): # print("MqttProcessor.onConnect") self.__processUpdatedRegisters(force=True) @@ -91,4 +97,3 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver): self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r)) r.onMessage(payload) self.queue.put(r) - diff --git a/src/MyPriorityQueue.py b/src/MyPriorityQueue.py index 71656df..a143c23 100644 --- a/src/MyPriorityQueue.py +++ b/src/MyPriorityQueue.py @@ -12,6 +12,7 @@ class MyPriorityQueueItem(object): def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority + class MyPriorityQueue(queue.PriorityQueue): def _put(self, itemWithPriority): i = MyPriorityQueueItem(itemWithPriority) diff --git a/src/MyRS485.py b/src/MyRS485.py index e02b209..38032ce 100644 --- a/src/MyRS485.py +++ b/src/MyRS485.py @@ -7,6 +7,7 @@ import termios DE_PIN = 0 + class MyRS485(serial.Serial): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -24,4 +25,3 @@ class MyRS485(serial.Serial): break # wiringpi.digitalWrite(DE_PIN, wiringpi.LOW) Pins.pinsWrite('DE', False) - diff --git a/src/NotificationForwarder.py b/src/NotificationForwarder.py index dfd6a6a..632a474 100644 --- a/src/NotificationForwarder.py +++ b/src/NotificationForwarder.py @@ -3,6 +3,7 @@ class AbstractNotificationReceiver(object): def receiveNotification(self, arg): raise NotImplementedError + class NotificationForwarder(object): def __init__(self): self.receivers = [] diff --git a/src/Pins.py b/src/Pins.py index e639603..3633072 100644 --- a/src/Pins.py +++ b/src/Pins.py @@ -8,7 +8,6 @@ PINS = { } - def pinsInit(): wiringpi.wiringPiSetup() for pin in PINS.values(): @@ -21,4 +20,3 @@ def pinsWrite(pinName, v): else: pinState = wiringpi.LOW wiringpi.digitalWrite(PINS[pinName], pinState) - diff --git a/src/RegisterDatapoint.py b/src/RegisterDatapoint.py index 4a72a7b..43454c9 100644 --- a/src/RegisterDatapoint.py +++ b/src/RegisterDatapoint.py @@ -6,7 +6,10 @@ import logging import json import Converters -class DatapointException(Exception): pass + +class DatapointException(Exception): + pass + class AbstractModbusDatapoint(object): def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, converter=None): @@ -40,17 +43,16 @@ class AbstractModbusDatapoint(object): self.errorCount, self.readCount, self.writeCount, self.converter)) def jsonify(self): - return {'type':self.__class__.__name__, - 'args': { k: getattr(self, k) for k in self.argList } - } + return {'type': self.__class__.__name__, + 'args': {k: getattr(self, k) for k in self.argList} + } def process(self, client): raise NotImplementedError - class HoldingRegisterDatapoint(AbstractModbusDatapoint): - def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, + def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, publishTopic=None, subscribeTopic=None, feedbackTopic=None, converter=None): super().__init__(label, unit, address, count, scanRate, converter) self.argList = self.argList + ['publishTopic', 'subscribeTopic', 'feedbackTopic'] @@ -83,16 +85,16 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): except Exception as e: raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e)) result = client.write_registers(address=self.address, - unit=self.unit, - values=values) - logger.debug("Write result: {0!s}".format(result)) + unit=self.unit, + values=values) + logger.debug("Write result: {0!s}".format(result)) self.writeRequestValue = None else: # perform read operation logger.debug("Holding register, perform read operation") self.readCount += 1 - result = client.read_holding_registers(address=self.address, - count=self.count, + result = client.read_holding_registers(address=self.address, + count=self.count, unit=self.unit) if type(result) in [ExceptionResponse, ModbusIOException]: self.errorCount += 1 @@ -110,7 +112,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint): if self.publishTopic: pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(value))) self.lastContact = datetime.datetime.now() - + def onMessage(self, value): self.writeRequestValue = value @@ -119,7 +121,7 @@ class CoilDatapoint(AbstractModbusDatapoint): def __init__(self, label=None, unit=None, address=None, scanRate=None, publishTopic=None, subscribeTopic=None, feedbackTopic=None): super().__init__(label, unit, address, 1, scanRate, None) - self.argList = ['label', 'unit','address','scanRate','publishTopic', 'subscribeTopic', 'feedbackTopic'] + self.argList = ['label', 'unit', 'address', 'scanRate', 'publishTopic', 'subscribeTopic', 'feedbackTopic'] self.publishTopic = publishTopic self.subscribeTopic = subscribeTopic self.feedbackTopic = feedbackTopic @@ -132,7 +134,7 @@ class CoilDatapoint(AbstractModbusDatapoint): "writeCount: {9}, publishTopic: {10}, subscribeTopic: {11}, feedbackTopic: {12}" .format(self.type, self.label, self.unit, self.address, self.scanRate, self.enqueued, self.lastContact, - self.errorCount, self.readCount, self.writeCount, + self.errorCount, self.readCount, self.writeCount, self.publishTopic, self.subscribeTopic, self.feedbackTopic)) def onMessage(self, value): @@ -145,7 +147,7 @@ class CoilDatapoint(AbstractModbusDatapoint): logger.debug("Coil, perform write operation") self.writeCount += 1 logger.debug("{0}: raw: {1!s}".format(self.label, self.writeRequestValue)) - value=None + value = None if self.writeRequestValue in ['true', 'True', 'yes', 'Yes', 'On', 'on']: value = True elif self.writeRequestValue in ['false', 'False', 'no', 'No', 'Off', 'off']: @@ -156,13 +158,13 @@ class CoilDatapoint(AbstractModbusDatapoint): result = client.write_coil(address=self.address, unit=self.unit, value=value) - logger.debug("Write result: {0!s}".format(result)) + logger.debug("Write result: {0!s}".format(result)) self.writeRequestValue = None else: # perform read operation logger.debug("Coil, perform read operation") self.readCount += 1 - result = client.read_coils(address=self.address, + result = client.read_coils(address=self.address, unit=self.unit) if type(result) in [ExceptionResponse, ModbusIOException]: self.errorCount += 1 @@ -175,7 +177,8 @@ class CoilDatapoint(AbstractModbusDatapoint): class ReadOnlyDatapoint(AbstractModbusDatapoint): - def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None, converter=None): + def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, + publishTopic=None, converter=None): super().__init__(label, unit, address, count, scanRate, converter) self.argList = self.argList + ['updateOnly', 'publishTopic'] self.updateOnly = updateOnly @@ -188,9 +191,8 @@ class ReadOnlyDatapoint(AbstractModbusDatapoint): self.lastValue)) - class InputRegisterDatapoint(ReadOnlyDatapoint): - def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, + def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None, converter=None): super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter) self.type = 'input register' @@ -224,7 +226,7 @@ class InputRegisterDatapoint(ReadOnlyDatapoint): class DiscreteInputDatapoint(ReadOnlyDatapoint): - def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, + def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None, converter=None, bitCount=8): super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter) self.argList = self.argList + ['bitCount'] @@ -253,12 +255,11 @@ class DiscreteInputDatapoint(ReadOnlyDatapoint): self.lastValues[i] = result.getBit(i) logger.debug("{0}, {1}: changed: {2!s}".format(self.label, i, result.getBit(i))) if self.publishTopic: - pubQueue.put(MqttProcessor.PublishItem("{0}/{1}".format(self.publishTopic, i), str(result.getBit(i)))) + pubQueue.put(MqttProcessor.PublishItem("{0}/{1}" + .format(self.publishTopic, i), str(result.getBit(i)))) self.lastContact = datetime.datetime.now() - - class JsonifyEncoder(json.JSONEncoder): def default(self, o): res = None @@ -271,6 +272,7 @@ class JsonifyEncoder(json.JSONEncoder): res = super().default(o) return res + def datapointObjectHook(j): if type(j) == dict and 'type' in j and 'args' in j: klass = eval(j['type']) @@ -279,14 +281,15 @@ def datapointObjectHook(j): else: return j + def saveRegisterList(registerList, registerListFile): js = json.dumps(registerList, cls=JsonifyEncoder, sort_keys=True, indent=4) with open(registerListFile, 'w') as f: f.write(js) - + + def loadRegisterList(registerListFile): with open(registerListFile, 'r') as f: js = f.read() registerList = json.loads(js, object_hook=datapointObjectHook) return registerList - diff --git a/src/ScanRateConsideringQueueFeeder.py b/src/ScanRateConsideringQueueFeeder.py index 99364c9..1fd3aad 100644 --- a/src/ScanRateConsideringQueueFeeder.py +++ b/src/ScanRateConsideringQueueFeeder.py @@ -3,6 +3,7 @@ import datetime from NotificationForwarder import AbstractNotificationReceiver import logging + class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): def __init__(self, config, registers, queue): super().__init__() @@ -14,7 +15,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei self.logger = logging.getLogger('ScanRateConsideringQueueFeeder') def getMinimalScanrate(self): - return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) + return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) def receiveNotification(self, arg): self.logger.info("ScanRateConsideringQueueFeeder:registersChanged") @@ -26,10 +27,10 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei registersToBeHandled = [ r for r in self.registers if ((not r.enqueued) and (r.scanRate) and - ((not r.lastContact) or + ((not r.lastContact) or (r.lastContact + r.scanRate < datetime.datetime.now()))) ] - registersToBeHandled.sort(key=lambda x : x.scanRate) + registersToBeHandled.sort(key=lambda x: x.scanRate) for r in registersToBeHandled: self.queue.put(r) r.enqueued = True diff --git a/src/initialRegisterFile.py b/src/initialRegisterFile.py index 615679b..fa10058 100644 --- a/src/initialRegisterFile.py +++ b/src/initialRegisterFile.py @@ -4,12 +4,14 @@ import pickle datapoints = [ - RegisterDatapoint.InputRegisterDatapoint('Temperature', 5, 0x0001, 1, datetime.timedelta(seconds=1.0), False, 'Pub/Temperature'), - RegisterDatapoint.InputRegisterDatapoint('Humidity', 5, 0x0002, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Humidity'), - RegisterDatapoint.DiscreteInputDatapoint('Switches', 4, 0x0000, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Switches'), + RegisterDatapoint.InputRegisterDatapoint('Temperature', 5, 0x0001, 1, + datetime.timedelta(seconds=1.0), False, 'Pub/Temperature'), + RegisterDatapoint.InputRegisterDatapoint('Humidity', 5, 0x0002, 1, + datetime.timedelta(seconds=1.0), True, 'Pub/Humidity'), + RegisterDatapoint.DiscreteInputDatapoint('Switches', 4, 0x0000, 1, + datetime.timedelta(seconds=1.0), True, 'Pub/Switches'), ] with open('registers.pkl', 'wb') as f: pickle.dump(datapoints, f) - diff --git a/src/loadRegisterFile.py b/src/loadRegisterFile.py index 7a18791..49a2223 100644 --- a/src/loadRegisterFile.py +++ b/src/loadRegisterFile.py @@ -3,4 +3,4 @@ import RegisterDatapoint registers = RegisterDatapoint.loadRegisterList('registers.json') for r in registers: - print("{0!s}".format(r)) \ No newline at end of file + print("{0!s}".format(r)) diff --git a/src/master.py b/src/master.py index 011fad5..d015004 100644 --- a/src/master.py +++ b/src/master.py @@ -36,7 +36,6 @@ if __name__ == "__main__": nf = NotificationForwarder.NotificationForwarder() logger.debug('infrastructure prepared') - datapoints = RegisterDatapoint.loadRegisterList(config.registerFile) logger.debug('datapoints read') @@ -56,7 +55,7 @@ if __name__ == "__main__": hb = Heartbeat.Heartbeat(config, pubQueue) hb.start() logger.debug('Heartbeat started') - + qf = ScanRateConsideringQueueFeeder.ScanRateConsideringQueueFeeder(config, datapoints, queue) nf.register(qf) qf.start() diff --git a/src/updateRegisterFile.py b/src/updateRegisterFile.py index 39d7b5c..64f46ae 100644 --- a/src/updateRegisterFile.py +++ b/src/updateRegisterFile.py @@ -10,15 +10,12 @@ with open('registers.pkl', 'rb') as f: newDatapoints = [] for dp in datapoints: ndp = type(dp)() - for k,v in dp.__dict__.items(): + for k, v in dp.__dict__.items(): if k != 'logger': ndp.__dict__[k] = v newDatapoints.append(ndp) - - js = json.dumps(newDatapoints, cls=RegisterDatapoint.JsonifyEncoder, sort_keys=True, indent=4) print(js) - -RegisterDatapoint.saveRegisterList(newDatapoints, 'registers.json') \ No newline at end of file +RegisterDatapoint.saveRegisterList(newDatapoints, 'registers.json')