from threading import Event from loguru import logger from MqttBase import AbstractMqttPublisher from queue import Empty LOOP_SLICE = 0.1 # seconds class MqttPublish(AbstractMqttPublisher): def __init__(self, config, queue): super().__init__(config) self.queue = queue self.topicPre = self.config["publishTopicPrefix"] def localLoop(self): while not self.killBill: try: dataObject = self.queue.get(timeout=LOOP_SLICE) if not dataObject.isValid(): logger.error("invalid dataObject received: drop it") else: topic = self.topicPre + '/' + dataObject.getTopicPost() payload = dataObject.getPayload() self.client.publish(topic, payload) logger.info("mqtt message sent: {} -> {}".format(topic, payload)) except Empty: if self.killBill: logger.error("killbill received") break