fix style issues
This commit is contained in:
parent
77d01ca675
commit
94e60ee172
@ -12,6 +12,7 @@ import logging
|
||||
|
||||
ERROR_PIN = 29
|
||||
|
||||
|
||||
class CommunicationProcessor(threading.Thread):
|
||||
def __init__(self, config, queue, pubQueue):
|
||||
super().__init__()
|
||||
@ -33,7 +34,6 @@ class CommunicationProcessor(threading.Thread):
|
||||
return MyRS485.MyRS485(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1,
|
||||
timeout=1)
|
||||
|
||||
|
||||
def run(self):
|
||||
client = ModbusSerialClient(method='rtu')
|
||||
client.socket = self.__getSerial()
|
||||
@ -56,7 +56,3 @@ class CommunicationProcessor(threading.Thread):
|
||||
client.socket = self.__getSerial()
|
||||
finally:
|
||||
time.sleep(self.config.interCommDelay)
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
# in: from Modbus to MQTT, input is a list of 16bit integers, output shall be the desired format
|
||||
# in: from Modbus to MQTT, input is a list of 16bit integers, output shall be the desired format
|
||||
# to be sent in the MQTT message
|
||||
# out: from MQTT to Modbus, input is the format received from MQTT, output shall be a list of
|
||||
# out: from MQTT to Modbus, input is the format received from MQTT, output shall be a list of
|
||||
# 16bit integers to be written to the Modbus slave
|
||||
|
||||
from struct import pack, unpack
|
||||
@ -12,20 +12,19 @@ def fix1twos(x):
|
||||
if x & 0x8000:
|
||||
r = ((x - 1) ^ 0xffff) * -1
|
||||
return r / 10
|
||||
|
||||
|
||||
|
||||
Converters = {
|
||||
"dht20TOFloat": {
|
||||
"in": lambda x : float(x[0]) / 10.0,
|
||||
"in": lambda x: float(x[0]) / 10.0,
|
||||
"out": None
|
||||
},
|
||||
"uint32": {
|
||||
"in": lambda x : unpack('L', pack('HH', *x))[0],
|
||||
"out": lambda x : unpack('HH', pack('L', int(x)))
|
||||
"in": lambda x: unpack('L', pack('HH', *x))[0],
|
||||
"out": lambda x: unpack('HH', pack('L', int(x)))
|
||||
},
|
||||
"fix1twos": {
|
||||
"in": lambda x: fix1twos(x),
|
||||
"out": None
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ import MqttProcessor
|
||||
import logging
|
||||
import time
|
||||
|
||||
|
||||
class Heartbeat(threading.Thread):
|
||||
def __init__(self, config, pubQueue):
|
||||
super().__init__()
|
||||
|
@ -4,6 +4,7 @@ from NotificationForwarder import AbstractNotificationReceiver
|
||||
import logging
|
||||
import Pins
|
||||
|
||||
|
||||
class PublishItem(object):
|
||||
def __init__(self, topic, payload):
|
||||
self.topic = topic
|
||||
@ -12,15 +13,19 @@ class PublishItem(object):
|
||||
def __str__(self):
|
||||
return 'Topic: {0}, Payload: {1}'.format(self.topic, self.payload)
|
||||
|
||||
|
||||
def mqttOnConnectCallback(client, userdata, flags, rc):
|
||||
userdata.onConnect()
|
||||
|
||||
|
||||
def mqttOnMessageCallback(client, userdata, message):
|
||||
userdata.onMessage(message.topic, message.payload)
|
||||
|
||||
|
||||
def mqttOnDisconnectCallback(client, userdata, rc):
|
||||
userdata.onDisconnect(rc)
|
||||
|
||||
|
||||
class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||
def __init__(self, config, registers, queue, pubQueue):
|
||||
super().__init__()
|
||||
@ -30,14 +35,15 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||
self.pubQueue = pubQueue
|
||||
self.client = mqtt.Client(userdata=self)
|
||||
self.subscriptions = []
|
||||
self.topicRegisterMap ={}
|
||||
self.topicRegisterMap = {}
|
||||
# self.daemon = True
|
||||
self.logger = logging.getLogger('MqttProcessor')
|
||||
|
||||
def __processUpdatedRegisters(self, force=False):
|
||||
self.logger.debug("MqttProcessor.__updateSubscriptions")
|
||||
|
||||
subscribeTopics = [ r.subscribeTopic for r in self.registers if hasattr(r,'subscribeTopic') and r.subscribeTopic]
|
||||
subscribeTopics = [r.subscribeTopic for r in self.registers if hasattr(r, 'subscribeTopic')
|
||||
and r.subscribeTopic]
|
||||
self.logger.debug("Topics: {0!s}".format(subscribeTopics))
|
||||
|
||||
for subscribeTopic in subscribeTopics:
|
||||
@ -52,7 +58,8 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||
self.client.unsubscribe(subscription)
|
||||
self.subscriptions.remove(subscription)
|
||||
|
||||
self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if hasattr(r,'subscribeTopic') and r.subscribeTopic }
|
||||
self.topicRegisterMap = {r.subscribeTopic: r for r in self.registers if hasattr(r, 'subscribeTopic')
|
||||
and r.subscribeTopic}
|
||||
|
||||
def receiveNotification(self, arg):
|
||||
self.logger.info("MqttProcessor:registersChanged")
|
||||
@ -76,7 +83,6 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||
else:
|
||||
self.logger.error("Invalid object in publish queue")
|
||||
|
||||
|
||||
def onConnect(self):
|
||||
# print("MqttProcessor.onConnect")
|
||||
self.__processUpdatedRegisters(force=True)
|
||||
@ -91,4 +97,3 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
|
||||
self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r))
|
||||
r.onMessage(payload)
|
||||
self.queue.put(r)
|
||||
|
||||
|
@ -12,6 +12,7 @@ class MyPriorityQueueItem(object):
|
||||
def __gt__(self, other): return self.itemWithPriority.priority > other.itemWithPriority.priority
|
||||
def __ge__(self, other): return self.itemWithPriority.priority >= other.itemWithPriority.priority
|
||||
|
||||
|
||||
class MyPriorityQueue(queue.PriorityQueue):
|
||||
def _put(self, itemWithPriority):
|
||||
i = MyPriorityQueueItem(itemWithPriority)
|
||||
|
@ -7,6 +7,7 @@ import termios
|
||||
|
||||
DE_PIN = 0
|
||||
|
||||
|
||||
class MyRS485(serial.Serial):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
@ -24,4 +25,3 @@ class MyRS485(serial.Serial):
|
||||
break
|
||||
# wiringpi.digitalWrite(DE_PIN, wiringpi.LOW)
|
||||
Pins.pinsWrite('DE', False)
|
||||
|
||||
|
@ -3,6 +3,7 @@ class AbstractNotificationReceiver(object):
|
||||
def receiveNotification(self, arg):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class NotificationForwarder(object):
|
||||
def __init__(self):
|
||||
self.receivers = []
|
||||
|
@ -8,7 +8,6 @@ PINS = {
|
||||
}
|
||||
|
||||
|
||||
|
||||
def pinsInit():
|
||||
wiringpi.wiringPiSetup()
|
||||
for pin in PINS.values():
|
||||
@ -21,4 +20,3 @@ def pinsWrite(pinName, v):
|
||||
else:
|
||||
pinState = wiringpi.LOW
|
||||
wiringpi.digitalWrite(PINS[pinName], pinState)
|
||||
|
||||
|
@ -6,7 +6,10 @@ import logging
|
||||
import json
|
||||
import Converters
|
||||
|
||||
class DatapointException(Exception): pass
|
||||
|
||||
class DatapointException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AbstractModbusDatapoint(object):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, converter=None):
|
||||
@ -40,17 +43,16 @@ class AbstractModbusDatapoint(object):
|
||||
self.errorCount, self.readCount, self.writeCount, self.converter))
|
||||
|
||||
def jsonify(self):
|
||||
return {'type':self.__class__.__name__,
|
||||
'args': { k: getattr(self, k) for k in self.argList }
|
||||
}
|
||||
return {'type': self.__class__.__name__,
|
||||
'args': {k: getattr(self, k) for k in self.argList}
|
||||
}
|
||||
|
||||
def process(self, client):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
|
||||
class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None,
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None,
|
||||
publishTopic=None, subscribeTopic=None, feedbackTopic=None, converter=None):
|
||||
super().__init__(label, unit, address, count, scanRate, converter)
|
||||
self.argList = self.argList + ['publishTopic', 'subscribeTopic', 'feedbackTopic']
|
||||
@ -83,16 +85,16 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
||||
except Exception as e:
|
||||
raise DatapointException("Exception caught when trying to converter modbus data: {0!s}".format(e))
|
||||
result = client.write_registers(address=self.address,
|
||||
unit=self.unit,
|
||||
values=values)
|
||||
logger.debug("Write result: {0!s}".format(result))
|
||||
unit=self.unit,
|
||||
values=values)
|
||||
logger.debug("Write result: {0!s}".format(result))
|
||||
self.writeRequestValue = None
|
||||
else:
|
||||
# perform read operation
|
||||
logger.debug("Holding register, perform read operation")
|
||||
self.readCount += 1
|
||||
result = client.read_holding_registers(address=self.address,
|
||||
count=self.count,
|
||||
result = client.read_holding_registers(address=self.address,
|
||||
count=self.count,
|
||||
unit=self.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
self.errorCount += 1
|
||||
@ -110,7 +112,7 @@ class HoldingRegisterDatapoint(AbstractModbusDatapoint):
|
||||
if self.publishTopic:
|
||||
pubQueue.put(MqttProcessor.PublishItem(self.publishTopic, str(value)))
|
||||
self.lastContact = datetime.datetime.now()
|
||||
|
||||
|
||||
def onMessage(self, value):
|
||||
self.writeRequestValue = value
|
||||
|
||||
@ -119,7 +121,7 @@ class CoilDatapoint(AbstractModbusDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, scanRate=None, publishTopic=None, subscribeTopic=None,
|
||||
feedbackTopic=None):
|
||||
super().__init__(label, unit, address, 1, scanRate, None)
|
||||
self.argList = ['label', 'unit','address','scanRate','publishTopic', 'subscribeTopic', 'feedbackTopic']
|
||||
self.argList = ['label', 'unit', 'address', 'scanRate', 'publishTopic', 'subscribeTopic', 'feedbackTopic']
|
||||
self.publishTopic = publishTopic
|
||||
self.subscribeTopic = subscribeTopic
|
||||
self.feedbackTopic = feedbackTopic
|
||||
@ -132,7 +134,7 @@ class CoilDatapoint(AbstractModbusDatapoint):
|
||||
"writeCount: {9}, publishTopic: {10}, subscribeTopic: {11}, feedbackTopic: {12}"
|
||||
.format(self.type, self.label, self.unit, self.address,
|
||||
self.scanRate, self.enqueued, self.lastContact,
|
||||
self.errorCount, self.readCount, self.writeCount,
|
||||
self.errorCount, self.readCount, self.writeCount,
|
||||
self.publishTopic, self.subscribeTopic, self.feedbackTopic))
|
||||
|
||||
def onMessage(self, value):
|
||||
@ -145,7 +147,7 @@ class CoilDatapoint(AbstractModbusDatapoint):
|
||||
logger.debug("Coil, perform write operation")
|
||||
self.writeCount += 1
|
||||
logger.debug("{0}: raw: {1!s}".format(self.label, self.writeRequestValue))
|
||||
value=None
|
||||
value = None
|
||||
if self.writeRequestValue in ['true', 'True', 'yes', 'Yes', 'On', 'on']:
|
||||
value = True
|
||||
elif self.writeRequestValue in ['false', 'False', 'no', 'No', 'Off', 'off']:
|
||||
@ -156,13 +158,13 @@ class CoilDatapoint(AbstractModbusDatapoint):
|
||||
result = client.write_coil(address=self.address,
|
||||
unit=self.unit,
|
||||
value=value)
|
||||
logger.debug("Write result: {0!s}".format(result))
|
||||
logger.debug("Write result: {0!s}".format(result))
|
||||
self.writeRequestValue = None
|
||||
else:
|
||||
# perform read operation
|
||||
logger.debug("Coil, perform read operation")
|
||||
self.readCount += 1
|
||||
result = client.read_coils(address=self.address,
|
||||
result = client.read_coils(address=self.address,
|
||||
unit=self.unit)
|
||||
if type(result) in [ExceptionResponse, ModbusIOException]:
|
||||
self.errorCount += 1
|
||||
@ -175,7 +177,8 @@ class CoilDatapoint(AbstractModbusDatapoint):
|
||||
|
||||
|
||||
class ReadOnlyDatapoint(AbstractModbusDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None, publishTopic=None, converter=None):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
|
||||
publishTopic=None, converter=None):
|
||||
super().__init__(label, unit, address, count, scanRate, converter)
|
||||
self.argList = self.argList + ['updateOnly', 'publishTopic']
|
||||
self.updateOnly = updateOnly
|
||||
@ -188,9 +191,8 @@ class ReadOnlyDatapoint(AbstractModbusDatapoint):
|
||||
self.lastValue))
|
||||
|
||||
|
||||
|
||||
class InputRegisterDatapoint(ReadOnlyDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
|
||||
publishTopic=None, converter=None):
|
||||
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter)
|
||||
self.type = 'input register'
|
||||
@ -224,7 +226,7 @@ class InputRegisterDatapoint(ReadOnlyDatapoint):
|
||||
|
||||
|
||||
class DiscreteInputDatapoint(ReadOnlyDatapoint):
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
|
||||
def __init__(self, label=None, unit=None, address=None, count=None, scanRate=None, updateOnly=None,
|
||||
publishTopic=None, converter=None, bitCount=8):
|
||||
super().__init__(label, unit, address, count, scanRate, updateOnly, publishTopic, converter)
|
||||
self.argList = self.argList + ['bitCount']
|
||||
@ -253,12 +255,11 @@ class DiscreteInputDatapoint(ReadOnlyDatapoint):
|
||||
self.lastValues[i] = result.getBit(i)
|
||||
logger.debug("{0}, {1}: changed: {2!s}".format(self.label, i, result.getBit(i)))
|
||||
if self.publishTopic:
|
||||
pubQueue.put(MqttProcessor.PublishItem("{0}/{1}".format(self.publishTopic, i), str(result.getBit(i))))
|
||||
pubQueue.put(MqttProcessor.PublishItem("{0}/{1}"
|
||||
.format(self.publishTopic, i), str(result.getBit(i))))
|
||||
self.lastContact = datetime.datetime.now()
|
||||
|
||||
|
||||
|
||||
|
||||
class JsonifyEncoder(json.JSONEncoder):
|
||||
def default(self, o):
|
||||
res = None
|
||||
@ -271,6 +272,7 @@ class JsonifyEncoder(json.JSONEncoder):
|
||||
res = super().default(o)
|
||||
return res
|
||||
|
||||
|
||||
def datapointObjectHook(j):
|
||||
if type(j) == dict and 'type' in j and 'args' in j:
|
||||
klass = eval(j['type'])
|
||||
@ -279,14 +281,15 @@ def datapointObjectHook(j):
|
||||
else:
|
||||
return j
|
||||
|
||||
|
||||
def saveRegisterList(registerList, registerListFile):
|
||||
js = json.dumps(registerList, cls=JsonifyEncoder, sort_keys=True, indent=4)
|
||||
with open(registerListFile, 'w') as f:
|
||||
f.write(js)
|
||||
|
||||
|
||||
|
||||
def loadRegisterList(registerListFile):
|
||||
with open(registerListFile, 'r') as f:
|
||||
js = f.read()
|
||||
registerList = json.loads(js, object_hook=datapointObjectHook)
|
||||
return registerList
|
||||
|
||||
|
@ -3,6 +3,7 @@ import datetime
|
||||
from NotificationForwarder import AbstractNotificationReceiver
|
||||
import logging
|
||||
|
||||
|
||||
class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver):
|
||||
def __init__(self, config, registers, queue):
|
||||
super().__init__()
|
||||
@ -14,7 +15,7 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei
|
||||
self.logger = logging.getLogger('ScanRateConsideringQueueFeeder')
|
||||
|
||||
def getMinimalScanrate(self):
|
||||
return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])
|
||||
return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])
|
||||
|
||||
def receiveNotification(self, arg):
|
||||
self.logger.info("ScanRateConsideringQueueFeeder:registersChanged")
|
||||
@ -26,10 +27,10 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei
|
||||
registersToBeHandled = [
|
||||
r for r in self.registers if ((not r.enqueued) and
|
||||
(r.scanRate) and
|
||||
((not r.lastContact) or
|
||||
((not r.lastContact) or
|
||||
(r.lastContact + r.scanRate < datetime.datetime.now())))
|
||||
]
|
||||
registersToBeHandled.sort(key=lambda x : x.scanRate)
|
||||
registersToBeHandled.sort(key=lambda x: x.scanRate)
|
||||
for r in registersToBeHandled:
|
||||
self.queue.put(r)
|
||||
r.enqueued = True
|
||||
|
@ -4,12 +4,14 @@ import pickle
|
||||
|
||||
|
||||
datapoints = [
|
||||
RegisterDatapoint.InputRegisterDatapoint('Temperature', 5, 0x0001, 1, datetime.timedelta(seconds=1.0), False, 'Pub/Temperature'),
|
||||
RegisterDatapoint.InputRegisterDatapoint('Humidity', 5, 0x0002, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Humidity'),
|
||||
RegisterDatapoint.DiscreteInputDatapoint('Switches', 4, 0x0000, 1, datetime.timedelta(seconds=1.0), True, 'Pub/Switches'),
|
||||
RegisterDatapoint.InputRegisterDatapoint('Temperature', 5, 0x0001, 1,
|
||||
datetime.timedelta(seconds=1.0), False, 'Pub/Temperature'),
|
||||
RegisterDatapoint.InputRegisterDatapoint('Humidity', 5, 0x0002, 1,
|
||||
datetime.timedelta(seconds=1.0), True, 'Pub/Humidity'),
|
||||
RegisterDatapoint.DiscreteInputDatapoint('Switches', 4, 0x0000, 1,
|
||||
datetime.timedelta(seconds=1.0), True, 'Pub/Switches'),
|
||||
]
|
||||
|
||||
|
||||
with open('registers.pkl', 'wb') as f:
|
||||
pickle.dump(datapoints, f)
|
||||
|
||||
|
@ -3,4 +3,4 @@ import RegisterDatapoint
|
||||
registers = RegisterDatapoint.loadRegisterList('registers.json')
|
||||
|
||||
for r in registers:
|
||||
print("{0!s}".format(r))
|
||||
print("{0!s}".format(r))
|
||||
|
@ -36,7 +36,6 @@ if __name__ == "__main__":
|
||||
nf = NotificationForwarder.NotificationForwarder()
|
||||
logger.debug('infrastructure prepared')
|
||||
|
||||
|
||||
datapoints = RegisterDatapoint.loadRegisterList(config.registerFile)
|
||||
logger.debug('datapoints read')
|
||||
|
||||
@ -56,7 +55,7 @@ if __name__ == "__main__":
|
||||
hb = Heartbeat.Heartbeat(config, pubQueue)
|
||||
hb.start()
|
||||
logger.debug('Heartbeat started')
|
||||
|
||||
|
||||
qf = ScanRateConsideringQueueFeeder.ScanRateConsideringQueueFeeder(config, datapoints, queue)
|
||||
nf.register(qf)
|
||||
qf.start()
|
||||
|
@ -10,15 +10,12 @@ with open('registers.pkl', 'rb') as f:
|
||||
newDatapoints = []
|
||||
for dp in datapoints:
|
||||
ndp = type(dp)()
|
||||
for k,v in dp.__dict__.items():
|
||||
for k, v in dp.__dict__.items():
|
||||
if k != 'logger':
|
||||
ndp.__dict__[k] = v
|
||||
newDatapoints.append(ndp)
|
||||
|
||||
|
||||
|
||||
js = json.dumps(newDatapoints, cls=RegisterDatapoint.JsonifyEncoder, sort_keys=True, indent=4)
|
||||
print(js)
|
||||
|
||||
|
||||
RegisterDatapoint.saveRegisterList(newDatapoints, 'registers.json')
|
||||
RegisterDatapoint.saveRegisterList(newDatapoints, 'registers.json')
|
||||
|
Loading…
x
Reference in New Issue
Block a user