54 lines
1.6 KiB
Python
54 lines
1.6 KiB
Python
import threading
|
|
from logger import Logger
|
|
import paho.mqtt.client as mqtt
|
|
import json
|
|
import datetime
|
|
import ssl
|
|
|
|
_parsers = {}
|
|
_seenTopics = {}
|
|
|
|
def on_message(client, userdata, msg):
|
|
try:
|
|
j = {'payload':msg.payload, 'topic':msg.topic, 'timestamp':datetime.datetime.now()}
|
|
|
|
# Logger.debug("MqttReceiver receives: %s" % j)
|
|
|
|
global _parsers
|
|
_parsers[msg.topic].execute(j)
|
|
|
|
global _seenTopics
|
|
_seenTopics[msg.topic] = datetime.datetime.now()
|
|
except ValueError, e:
|
|
Logger.log("Exception %s in MqttReceiver, on_message" % (str(e)))
|
|
|
|
|
|
class MqttReceiver(threading.Thread):
|
|
singleton = None
|
|
|
|
@classmethod
|
|
def create(cls, config):
|
|
if cls.singleton is None:
|
|
cls.singleton = MqttReceiver(config)
|
|
return cls.singleton
|
|
|
|
def __init__(self, config):
|
|
threading.Thread.__init__(self)
|
|
self.client = mqtt.Client()
|
|
self.client.on_message = on_message
|
|
if config['tls']:
|
|
self.client.tls_set(config['ca'], tls_version=ssl.PROTOCOL_TLSv1)
|
|
if config['user']:
|
|
self.client.username_pw_set(config['user'], config['password'])
|
|
self.client.connect(config['host'], config['port'], 60)
|
|
self.setDaemon(True)
|
|
|
|
def registerParser(self, parser):
|
|
Logger.log("registering %s for topic %s" % (parser.__class__.__name__, parser.getTopic()))
|
|
self.client.subscribe(parser.getTopic())
|
|
global _parsers
|
|
_parsers[parser.getTopic()] = parser
|
|
|
|
def run(self):
|
|
self.client.loop_forever()
|