import paho.mqtt.client as mqtt from loguru import logger import threading import ssl def mqttOnConnectCallback(client, userdata, flags, rc): userdata.onConnect() def mqttOnMessageCallback(client, userdata, message): userdata.onMessage(message.topic, message.payload) def mqttOnDisconnectCallback(client, userdata, rc): userdata.onDisconnect(rc) class AbstractMqttPublisher(threading.Thread): def __init__(self, config): super().__init__() self.config = config["mqtt"] self.client = mqtt.Client(userdata=self) # consider this flag in the localLoop self.killBill = False self.killEvent = threading.Event() def run(self): self.client.on_message = mqttOnMessageCallback self.client.on_connect = mqttOnConnectCallback self.client.on_disconnect = mqttOnDisconnectCallback if ("login" in self.config) and ("password" in self.config): self.client.username_pw_set(self.config["login"], self.config["password"]) if ("ca" in self.config) and ("cert" in self.config) and ("key" in self.config): self.client.tls_set( ca_certs=self.config["ca"], certfile=self.config["cert"], keyfile=self.config["key"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None # this does not mean "no cipher" but it means "default ciphers" ) self.client.connect(self.config["broker"], int(self.config["port"])) self.client.loop_start() logger.info("mqtt loop started") self.localLoop() def localLoop(self): raise NotImplementedError() def stop(self): self.client.loop_stop() logger.info("mqtt loop stopped") self.killBill = True logger.info("kill flag set") self.killEvent.set() logger.info("kill events triggered") def onConnect(self): logger.info("mqtt connected") def onDisconnect(self, rc): logger.warning("mqtt disconnect, rc: {}".format(rc)) def onMessage(self, topic, payload): logger.warning("mqtt unexpected message received: {} -> {}".format(topic, str(payload)))