from threading import Event from loguru import logger from MqttBase import AbstractMqttPublisher from queue import Empty import json import datetime LOOP_SLICE = 0.1 # seconds class MqttPublish(AbstractMqttPublisher): def __init__(self, config, queue): super().__init__(config) self.queue = queue self.topicPre = self.config["publishTopicPrefix"] self.statusTopic = self.config["statusTopic"] self.statusThreshold = self.config["statusThreshold"] def localLoop(self): cnt = 0 startTime = datetime.datetime.now() while not self.killBill: try: dataObject = self.queue.get(timeout=LOOP_SLICE) if not dataObject.isValid(): logger.error("invalid dataObject received: drop it") else: topic = self.topicPre + '/' + dataObject.getTopicPost() payload = dataObject.getPayload() self.client.publish(topic, payload) cnt += 1 logger.debug("mqtt message sent: {} -> {}".format(topic, payload)) if cnt % 100 == 1: currentTime = datetime.datetime.now() uptime = int((currentTime - startTime).total_seconds()) payload = { "count": cnt, "uptime": uptime } self.client.publish(self.statusTopic, json.dumps(payload), retain=True) except Empty: if self.killBill: logger.error("killbill received") break except Exception as e: logger.error(f"Exception {type(e)} received in MQTT local loop: {e}")