This commit is contained in:
2019-07-14 00:47:16 +02:00
parent 5a9a6d67f0
commit 9f6f449c5a
3 changed files with 17 additions and 18 deletions

View File

@ -21,7 +21,7 @@ class CommunicationProcessor(threading.Thread):
wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT) wiringpi.pinMode(ERROR_PIN, wiringpi.OUTPUT)
self.daemon = True self.daemon = True
logging.getLogger('pymodbus').setLevel(logging.ERROR) logging.getLogger('pymodbus').setLevel(logging.ERROR)
self.logger = logging.getLogger('CommunicationProcessor')
def __getSerial(self): def __getSerial(self):
# return RS485Ext.RS485Ext(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1, # return RS485Ext.RS485Ext(port=self.config.serialPort, baudrate=self.config.serialBaudRate, stopbits=1,
@ -39,14 +39,14 @@ class CommunicationProcessor(threading.Thread):
r = self.queue.get() r = self.queue.get()
try: try:
wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW) wiringpi.digitalWrite(ERROR_PIN, wiringpi.LOW)
print("Dequeued: {0!s}".format(r)) self.logger.debug("Dequeued: {0!s}".format(r))
r.enqueued = False r.enqueued = False
r.process(client, self.pubQueue) r.process(client, self.pubQueue)
except RegisterDatapoint.DatapointException as e: except RegisterDatapoint.DatapointException as e:
wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH) wiringpi.digitalWrite(ERROR_PIN, wiringpi.HIGH)
print("ERROR when processing '{0}': {1!s}".format(r.label, e)) self.logger.error("ERROR when processing '{0}': {1!s}".format(r.label, e))
if client.socket is None: if client.socket is None:
print("renew socket") self.logger.error("renew socket")
client.socket = self.__getSerial() client.socket = self.__getSerial()
finally: finally:
time.sleep(self.config.interCommDelay) time.sleep(self.config.interCommDelay)

View File

@ -1,7 +1,7 @@
import threading import threading
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from NotificationForwarder import AbstractNotificationReceiver from NotificationForwarder import AbstractNotificationReceiver
import logging
class PublishItem(object): class PublishItem(object):
@ -29,33 +29,33 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
self.subscriptions = [] self.subscriptions = []
self.topicRegisterMap ={} self.topicRegisterMap ={}
self.daemon = True self.daemon = True
self.logger = logging.getLogger('MqttProcessor')
def __processUpdatedRegisters(self, force=False): def __processUpdatedRegisters(self, force=False):
# print("MqttProcessor.__updateSubscriptions") self.logger.debug("MqttProcessor.__updateSubscriptions")
subscribeTopics = [ r.subscribeTopic for r in self.registers if r.subscribeTopic] subscribeTopics = [ r.subscribeTopic for r in self.registers if r.subscribeTopic]
# print("Topics: {0!s}".format(subscribeTopics)) self.logger.debug("Topics: {0!s}".format(subscribeTopics))
for subscribeTopic in subscribeTopics: for subscribeTopic in subscribeTopics:
if (subscribeTopic not in self.subscriptions) or force: if (subscribeTopic not in self.subscriptions) or force:
print("Subscribe to {0}".format(subscribeTopic)) self.logger.debug("Subscribe to {0}".format(subscribeTopic))
self.client.subscribe(subscribeTopic) self.client.subscribe(subscribeTopic)
self.subscriptions.append(subscribeTopic) self.subscriptions.append(subscribeTopic)
for subscription in self.subscriptions: for subscription in self.subscriptions:
if (subscription not in subscribeTopics) and not force: if (subscription not in subscribeTopics) and not force:
print("Unsubscribe from {0}".format(subscription)) self.logger.debug("Unsubscribe from {0}".format(subscription))
self.client.unsubscribe(subscription) self.client.unsubscribe(subscription)
self.subscriptions.remove(subscription) self.subscriptions.remove(subscription)
self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if r.subscribeTopic } self.topicRegisterMap = { r.subscribeTopic: r for r in self.registers if r.subscribeTopic }
def receiveNotification(self, arg): def receiveNotification(self, arg):
print("MqttProcessor:registersChanged") self.logger.info("MqttProcessor:registersChanged")
self.__processUpdatedRegisters() self.__processUpdatedRegisters()
def run(self): def run(self):
# print("MqttProcessor.run")
self.client.on_message = mqttOnMessageCallback self.client.on_message = mqttOnMessageCallback
self.client.on_connect = mqttOnConnectCallback self.client.on_connect = mqttOnConnectCallback
self.client.on_disconnect = mqttOnDisconnectCallback self.client.on_disconnect = mqttOnDisconnectCallback
@ -69,7 +69,7 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
if isinstance(pubItem, PublishItem): if isinstance(pubItem, PublishItem):
self.client.publish(pubItem.topic, pubItem.payload) self.client.publish(pubItem.topic, pubItem.payload)
else: else:
print("Invalid object in publish queue") self.logger.error("Invalid object in publish queue")
def onConnect(self): def onConnect(self):
@ -77,14 +77,12 @@ class MqttProcessor(threading.Thread, AbstractNotificationReceiver):
self.__processUpdatedRegisters(force=True) self.__processUpdatedRegisters(force=True)
def onDisconnect(self, rc): def onDisconnect(self, rc):
print("Disconnected from MQTT broker: {0}".format(rc)) self.logger.error("Disconnected from MQTT broker: {0}".format(rc))
def onMessage(self, topic, payload): def onMessage(self, topic, payload):
# print("MqttProcessor.onMessage") # print("MqttProcessor.onMessage")
r = self.topicRegisterMap[topic] r = self.topicRegisterMap[topic]
# print("{0}: {1!s} -> {2!s}".format(topic, payload, r)) self.logger.debug("{0}: {1!s} -> {2!s}".format(topic, payload, r))
r.onMessage(payload) r.onMessage(payload)
self.queue.put(r) self.queue.put(r)

View File

@ -1,7 +1,7 @@
import threading import threading
import datetime import datetime
from NotificationForwarder import AbstractNotificationReceiver from NotificationForwarder import AbstractNotificationReceiver
import logging
class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver): class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationReceiver):
def __init__(self, config, registers, queue): def __init__(self, config, registers, queue):
@ -11,12 +11,13 @@ class ScanRateConsideringQueueFeeder(threading.Thread, AbstractNotificationRecei
self.queue = queue self.queue = queue
self.delayEvent = threading.Event() self.delayEvent = threading.Event()
self.daemon = True self.daemon = True
self.logger = logging.getLogger('ScanRateConsideringQueueFeeder')
def getMinimalScanrate(self): def getMinimalScanrate(self):
return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate]) return min([r.scanRate.total_seconds() for r in self.registers if r.scanRate])
def receiveNotification(self, arg): def receiveNotification(self, arg):
print("ScanRateConsideringQueueFeeder:registersChanged") self.logger.info("ScanRateConsideringQueueFeeder:registersChanged")
self.delay = self.getMinimalScanrate() self.delay = self.getMinimalScanrate()
def run(self): def run(self):