opcua-with-python/opcua2mqtt/MqttPublish.py

43 lines
1.7 KiB
Python
Raw Normal View History

2022-02-04 19:46:15 +01:00
from threading import Event
from loguru import logger
from MqttBase import AbstractMqttPublisher
from queue import Empty
2022-02-04 23:19:49 +01:00
import json
2022-02-04 23:29:57 +01:00
import datetime
2022-02-04 19:46:15 +01:00
LOOP_SLICE = 0.1 # seconds
class MqttPublish(AbstractMqttPublisher):
def __init__(self, config, queue):
super().__init__(config)
self.queue = queue
self.topicPre = self.config["publishTopicPrefix"]
2022-02-04 23:19:49 +01:00
self.statusTopic = self.config["statusTopic"]
self.statusThreshold = self.config["statusThreshold"]
2022-02-04 19:46:15 +01:00
def localLoop(self):
2022-02-04 23:19:49 +01:00
cnt = 0
2022-02-04 23:29:57 +01:00
startTime = datetime.datetime.now()
2022-02-04 19:46:15 +01:00
while not self.killBill:
try:
dataObject = self.queue.get(timeout=LOOP_SLICE)
if not dataObject.isValid():
logger.error("invalid dataObject received: drop it")
else:
topic = self.topicPre + '/' + dataObject.getTopicPost()
payload = dataObject.getPayload()
self.client.publish(topic, payload)
2022-02-04 23:19:49 +01:00
cnt += 1
2022-02-04 22:13:19 +01:00
logger.debug("mqtt message sent: {} -> {}".format(topic, payload))
2022-02-04 23:20:55 +01:00
if cnt % 100 == 1:
2022-02-04 23:29:57 +01:00
currentTime = datetime.datetime.now()
uptime = int((currentTime - startTime).total_seconds())
payload = { "count": cnt, "uptime": uptime }
2022-02-04 23:19:49 +01:00
self.client.publish(self.statusTopic, json.dumps(payload), retain=True)
2022-02-04 19:46:15 +01:00
except Empty:
2022-02-05 11:08:49 +01:00
# just evaluate the killBill at the top of the loop again
pass
2022-02-04 23:06:43 +01:00
except Exception as e:
logger.error(f"Exception {type(e)} received in MQTT local loop: {e}")
2022-02-04 19:46:15 +01:00