diff --git a/MqttReceiver.py b/MqttReceiver.py new file mode 100644 index 0000000..35a9bb5 --- /dev/null +++ b/MqttReceiver.py @@ -0,0 +1,55 @@ +''' +Created on 09.06.2015 + +@author: wn +''' + +import threading +import Queue +from logger import Logger +import paho.mqtt.client as mqtt +import json +import datetime + +__queue = None + +def on_message(client, userdata, msg): + j = json.loads(msg.payload) + now = datetime.datetime.now() + midnight = now.replace(now.year, now.month, now.day, 0,0,0,0) + seconds = (now - midnight).seconds + j['metadata']['timestamp'] = datetime.datetime.now() + j['metadata']['seconds'] = seconds + j['metadata']['day'] = midnight + + try: + __queue.put_nowait(j) + except Queue.Full: + Logger.log("Message %s dropped" % (j)) + + + + +class MqttReceiver(threading.Thread): + singleton = None + + @classmethod + def create(cls, queue): + if cls.singleton is None: + cls.singleton = MqttReceiver(queue) + __queue = queue + return cls.singleton + + def __init__(self, queue): + threading.Thread.__init__(self) + self.queue = queue + self.setDaemon(True) + + def run(self): + client = mqtt.Client() + client.on_message = on_message + client.connect("mqttbroker", 1883, 60) + client.subscribe("IoT/Measurement/#") + client.subscribe("IoT/WiFiPowerMeter/Measurement") + + client.loop_forever() diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..0a09fc2 --- /dev/null +++ b/logger.py @@ -0,0 +1,19 @@ + + +class Logger(object): + @staticmethod + def log(data): + print data + if Logger.debugFlag: + print data + + @staticmethod + def debugEnable(): + Logger.debugFlag = True + + @staticmethod + def debugDisable(): + Logger.debugFlag = False + + debugFlag = False +