diff --git a/opcua2mqtt/MqttPublish.py b/opcua2mqtt/MqttPublish.py index 3df8211..a77b07d 100644 --- a/opcua2mqtt/MqttPublish.py +++ b/opcua2mqtt/MqttPublish.py @@ -2,6 +2,7 @@ from threading import Event from loguru import logger from MqttBase import AbstractMqttPublisher from queue import Empty +import json LOOP_SLICE = 0.1 # seconds @@ -11,8 +12,11 @@ class MqttPublish(AbstractMqttPublisher): super().__init__(config) self.queue = queue self.topicPre = self.config["publishTopicPrefix"] + self.statusTopic = self.config["statusTopic"] + self.statusThreshold = self.config["statusThreshold"] def localLoop(self): + cnt = 0 while not self.killBill: try: dataObject = self.queue.get(timeout=LOOP_SLICE) @@ -22,7 +26,11 @@ class MqttPublish(AbstractMqttPublisher): topic = self.topicPre + '/' + dataObject.getTopicPost() payload = dataObject.getPayload() self.client.publish(topic, payload) + cnt += 1 logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) + if cnt % 100 == 0: + payload = { "count": cnt } + self.client.publish(self.statusTopic, json.dumps(payload), retain=True) except Empty: if self.killBill: logger.error("killbill received") diff --git a/opcua2mqtt/config.json b/opcua2mqtt/config.json index 8b93b71..a5e0f07 100644 --- a/opcua2mqtt/config.json +++ b/opcua2mqtt/config.json @@ -2,7 +2,9 @@ "mqtt": { "broker": "172.16.2.16", "port": 1883, - "publishTopicPrefix": "opcua" + "publishTopicPrefix": "opcua", + "statusTopic": "opcua/bridge/status", + "statusThreshold": 100 }, "opcua": [ {