MqttDispatcherPy/MqttReceiver.py
2017-11-13 15:07:52 +01:00

54 lines
1.6 KiB
Python

import threading
from logger import Logger
import paho.mqtt.client as mqtt
import json
import datetime
import ssl
_parsers = {}
_seenTopics = {}
def on_message(client, userdata, msg):
try:
j = {'payload':msg.payload, 'topic':msg.topic, 'timestamp':datetime.datetime.now()}
# Logger.debug("MqttReceiver receives: %s" % j)
global _parsers
_parsers[msg.topic].execute(j)
global _seenTopics
_seenTopics[msg.topic] = datetime.datetime.now()
except ValueError, e:
Logger.log("Exception %s in MqttReceiver, on_message" % (str(e)))
class MqttReceiver(threading.Thread):
singleton = None
@classmethod
def create(cls, config):
if cls.singleton is None:
cls.singleton = MqttReceiver(config)
return cls.singleton
def __init__(self, config):
threading.Thread.__init__(self)
self.client = mqtt.Client()
self.client.on_message = on_message
if config['tls']:
self.client.tls_set(config['ca'], tls_version=ssl.PROTOCOL_TLSv1)
if config['user']:
self.client.username_pw_set(config['user'], config['password'])
self.client.connect(config['host'], config['port'], 60)
self.setDaemon(True)
def registerParser(self, parser):
Logger.log("registering %s for topic %s" % (parser.__class__.__name__, parser.getTopic()))
self.client.subscribe(parser.getTopic())
global _parsers
_parsers[parser.getTopic()] = parser
def run(self):
self.client.loop_forever()