threads and queues

This commit is contained in:
hg 2015-06-09 22:09:28 +02:00
parent ea0c98b4c5
commit d851a9f5dc
4 changed files with 146 additions and 61 deletions

36
MongoWriter.py Normal file
View File

@ -0,0 +1,36 @@
'''
Created on 09.06.2015
@author: wn
'''
import threading
from logger import Logger
import pymongo
class MongoWriter(threading.Thread):
def __init__(self, queue, dbhost, database, collection):
threading.Thread.__init__(self)
self.queue = queue
self.dbhost = dbhost
self.database = database
self.collection = collection
self.setDaemon(True)
def run(self):
while True:
try:
msg = self.queue.get()
# Logger.debug("MongoWriter receives: %s" % msg)
mongoClient = pymongo.MongoClient(self.dbhost)
db = mongoClient[self.database]
res = db[self.collection].insert_one(msg)
Logger.debug("MongoWriter inserts: %s" % res.inserted_id)
except pymongo.errors.ServerSelectionTimeoutError, e:
Logger.log("Exception %s in MongoWriter, run" % str(e))
Logger.log("Msg dropped: %s" % msg)
except TypeError, e:
Logger.log("Exception %s in MongoWriter, run" % str(e))
except Exception, e:
Logger.log("Unexcepted exception %s in MongoWriter: %s" % (e.__class__.__name__, str(e)))

View File

@ -10,46 +10,54 @@ from logger import Logger
import paho.mqtt.client as mqtt
import json
import datetime
from twisted.spread.pb import Broker
__queue = None
_queue = None
def on_message(client, userdata, msg):
j = json.loads(msg.payload)
now = datetime.datetime.now()
midnight = now.replace(now.year, now.month, now.day, 0,0,0,0)
seconds = (now - midnight).seconds
j['metadata']['timestamp'] = datetime.datetime.now()
j['metadata']['seconds'] = seconds
j['metadata']['day'] = midnight
try:
__queue.put_nowait(j)
j = json.loads(msg.payload)
now1 = datetime.datetime.now()
now = now1.replace(now1.year, now1.month, now1.day, now1.hour, now1.minute, now1.second, 0)
midnight = now.replace(now.year, now.month, now.day, 0,0,0,0)
seconds = (now - midnight).seconds
j['metadata']['timestamp'] = now
j['metadata']['seconds'] = seconds
j['metadata']['day'] = midnight
Logger.debug("MqttReceiver queues: %s" % j)
_queue.put_nowait(j)
except Queue.Full:
Logger.log("Message %s dropped" % (j))
except ValueError, e:
Logger.log("Exception %s in MqttReceiver, on_message" % (str(e)))
class MqttReceiver(threading.Thread):
singleton = None
@classmethod
def create(cls, queue):
def create(cls, queue, broker, topics):
global _queue
if cls.singleton is None:
cls.singleton = MqttReceiver(queue)
__queue = queue
cls.singleton = MqttReceiver(queue, broker, topics)
_queue = queue
return cls.singleton
def __init__(self, queue):
def __init__(self, queue, broker, topics):
threading.Thread.__init__(self)
self.queue = queue
self.broker = broker
self.topics = topics
self.setDaemon(True)
def run(self):
client = mqtt.Client()
client.on_message = on_message
client.connect("mqttbroker", 1883, 60)
client.subscribe("IoT/Measurement/#")
client.subscribe("IoT/WiFiPowerMeter/Measurement")
client.connect(self.broker, 1883, 60)
for topic in self.topics:
Logger.log("Subscribing on %s" % str(topic))
client.subscribe(topic)
client.loop_forever()

View File

@ -1,19 +1,31 @@
from time import gmtime, strftime
class Logger(object):
@staticmethod
def log(data):
print data
if Logger.debugFlag:
print data
@staticmethod
def debugEnable():
Logger.debugFlag = True
@staticmethod
def debugDisable():
Logger.debugFlag = False
debugFlag = False
@classmethod
def log(cls, data):
t = strftime("%d %b %Y %H:%M:%S", gmtime())
with open(cls.logfile, 'a') as f:
f.write("%s %s\n" % (t, data))
if cls.debugFlag:
print data
@classmethod
def debug(cls, data):
if cls.debugFlag:
cls.log(data)
@classmethod
def debugEnable(cls):
cls.debugFlag = True
@classmethod
def debugDisable(cls):
cls.debugFlag = False
@classmethod
def openlog(cls, logfile):
cls.logfile = logfile

View File

@ -4,33 +4,62 @@ Created on 20.05.2015
@author: wn
'''
import paho.mqtt.client as mqtt
import json
import pymongo
import datetime
from MqttReceiver import MqttReceiver
from MongoWriter import MongoWriter
import Queue
import os
import sys
from logger import Logger
import time
def on_message(client, userdata, msg):
j = json.loads(msg.payload)
now = datetime.datetime.now()
midnight = now.replace(now.year, now.month, now.day, 0,0,0,0)
seconds = (now - midnight).seconds
j['metadata']['timestamp'] = datetime.datetime.now()
j['metadata']['seconds'] = seconds
j['metadata']['day'] = midnight
print(j)
mongoClient = pymongo.MongoClient('localhost')
db = mongoClient.iot
r = db.iot.insert_one(j)
DEBUG = True
BACKGROUND = False
PID_FILE = "/tmp/mqtt2mongo.pid"
LOG_FILE = "/tmp/mqtt2mongo.log"
BROKER = "mqttbroker"
TOPICS = ['IoT/Measurement/#', 'IoT/WiFiPowerMeter/Measurement']
MONGO_HOST = "172.16.2.15"
MONGO_DATABASE = "test"
MONGO_COLLECTION = "test"
if __name__ == '__main__':
print("Starting client ...")
if BACKGROUND:
pid = os.fork()
else:
pid = 0
if pid:
pidFile = file(PID_FILE , mode='w')
pidFile.write("%i\n" % pid)
pidFile.close()
sys.exit(0)
Logger.openlog(LOG_FILE)
if DEBUG:
Logger.debugEnable()
Logger.log("mqtt2mongo starting")
try:
queue = Queue.Queue()
Logger.log("queue created ...")
client = mqtt.Client()
client.on_message = on_message
client.connect("mqttbroker", 1883, 60)
client.subscribe("IoT/Measurement/#")
client.subscribe("IoT/WiFiPowerMeter/Measurement")
client.loop_forever()
mongoWriter = MongoWriter(queue, MONGO_HOST, MONGO_DATABASE, MONGO_COLLECTION)
mongoWriter.start()
Logger.log("MongoWriter started ...")
mqttReader = MqttReceiver.create(queue, BROKER, TOPICS)
mqttReader.start()
Logger.log("MqttReader started ...")
Logger.log("mqtt2mongo running")
while (True):
time.sleep(10)
except Exception, e:
Logger.log("mqtt2mongo dying from %s, %s" % (str(e), e.msg))
finally:
Logger.log("mqtt2mongo terminating ...")