Files
Mqtt2Mongo/mqtt2mongo.py
Wolfgang Hottgenroth 43cf7396c1 fix in MonitorPublisher
2016-07-11 21:20:22 +02:00

91 lines
2.1 KiB
Python

'''
Created on 20.05.2015
@author: wn
'''
from MqttReceiver import MqttReceiver
from MongoWriter import MongoWriter
from MeterBusDecoder import MeterBusDecoder
from RePublisher import RePublisher
from MonitorPublisher import MonitorPublisher
from Broker import Broker
import Queue
import os
import sys
from logger import Logger
import time
DEBUG = False
BACKGROUND = True
PID_FILE = "/opt/logs/mqtt2mongo.pid"
LOG_FILE = "/opt/logs/mqtt2mongo.log"
MQTT_BROKER = "127.0.0.1"
TOPICS = ['IoT/Measurement/#', 'IoT/WiFiPowerMeter/Measurement']
MONGO_HOST = "127.0.0.1"
MONGO_DATABASE = "iot"
MONGO_COLLECTION = "iot"
if BACKGROUND:
pid = os.fork()
else:
pid = 0
if pid:
pidFile = file(PID_FILE , mode='w')
pidFile.write("%i\n" % pid)
pidFile.close()
sys.exit(0)
Logger.openlog(LOG_FILE)
if DEBUG:
Logger.debugEnable()
Logger.log("mqtt2mongo starting")
try:
queue1 = Queue.Queue()
Logger.log("queues created ...")
mqttReader = MqttReceiver.create(queue1, MQTT_BROKER, TOPICS)
mqttReader.start()
Logger.log("MqttReader started ...")
broker = Broker()
broker.start()
queue2 = broker.getInQueue()
Logger.log("Broker started ...")
meterBusDecoder = MeterBusDecoder(queue1, queue2)
meterBusDecoder.start()
Logger.log("MeterBusDecoder started ...")
queue3 = broker.subscribe('mongoWriter')
mongoWriter = MongoWriter(queue3, MONGO_HOST, MONGO_DATABASE, MONGO_COLLECTION)
mongoWriter.start()
Logger.log("MongoWriter started ...")
queue4 = broker.subscribe('republisher')
republisher = RePublisher(queue4, MQTT_BROKER)
republisher.start()
Logger.log("RePublisher started ...")
queue5 = broker.subscribe('monitorpublisher')
monitorpublisher = MonitorPublisher(queue5, MQTT_BROKER)
monitorpublisher.start()
Logger.log("MonitorPublisher started ...")
Logger.log("mqtt2mongo running")
while (True):
time.sleep(10)
except Exception, e:
Logger.log("mqtt2mongo dying from %s, %s" % (str(e), e.msg))
finally:
Logger.log("mqtt2mongo terminating ...")