import paho.mqtt.client as mqtt from loguru import logger import uuid import threading def mqttOnConnectCallback(client, userdata, flags, rc): userdata.onConnect() def mqttOnMessageCallback(client, userdata, message): userdata.onMessage(message.topic, message.payload) def mqttOnDisconnectCallback(client, userdata, rc): userdata.onDisconnect(rc) class AbstractMqttPublisher(threading.Thread): def __init__(self, config): super().__init__() self.config = config client_id = f"pv-controller-{uuid.uuid4()}" logger.info(f"mqtt client id: {client_id}") self.client = mqtt.Client(client_id=client_id, userdata=self) self.cache = {} # consider this flag in the localLoop self.killBill = False self.killEvent = threading.Event() def run(self): self.client.on_message = mqttOnMessageCallback self.client.on_connect = mqttOnConnectCallback self.client.on_disconnect = mqttOnDisconnectCallback self.client.connect(self.config.mqtt.broker, int(self.config.mqtt.port)) self.client.loop_start() logger.info("mqtt loop started") self.localLoop() def localLoop(self): raise NotImplementedError() def stop(self): self.client.loop_stop() logger.info("mqtt loop stopped") self.killBill = True logger.info("kill flag set") self.killEvent.set() logger.info("kill events triggered") def onConnect(self): logger.info("mqtt connected") def onDisconnect(self, rc): logger.warning("mqtt disconnect, rc: {}".format(rc)) def onMessage(self, topic, payload): logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload))) def publish_with_cache(self, topic, payload): if topic in self.cache and self.cache[topic] == payload: logger.debug(f"mqtt message unchanged, not publishing: {topic} -> {payload}") return self.cache[topic] = payload self.client.publish(topic, payload)