''' Created on 09.06.2015 @author: wn ''' import threading import Queue from logger import Logger import paho.mqtt.client as mqtt import json import datetime _queue = None def on_message(client, userdata, msg): try: j = json.loads(msg.payload) now1 = datetime.datetime.now() now = now1.replace(now1.year, now1.month, now1.day, now1.hour, now1.minute, now1.second, 0) midnight = now.replace(now.year, now.month, now.day, 0,0,0,0) seconds = (now - midnight).seconds j['metadata']['timestamp'] = now j['metadata']['seconds'] = seconds j['metadata']['day'] = midnight Logger.debug("MqttReceiver queues: %s" % j) _queue.put_nowait(j) except Queue.Full: Logger.log("Message %s dropped" % (j)) except ValueError, e: Logger.log("Exception %s in MqttReceiver, on_message" % (str(e))) class MqttReceiver(threading.Thread): singleton = None @classmethod def create(cls, queue, broker, topics): global _queue if cls.singleton is None: cls.singleton = MqttReceiver(queue, broker, topics) _queue = queue return cls.singleton def __init__(self, queue, broker, topics): threading.Thread.__init__(self) self.queue = queue self.broker = broker self.topics = topics self.setDaemon(True) def run(self): client = mqtt.Client() client.on_message = on_message client.connect(self.broker, 1883, 60) for topic in self.topics: Logger.log("Subscribing on %s" % str(topic)) client.subscribe(topic) client.loop_forever()