Mqtt2Mongo/mqtt2mongo.py
Wolfgang Hottgenroth d9064f3527 add AwsPublish
2017-07-18 21:23:19 +02:00

103 lines
2.3 KiB
Python
Executable File

#!/usr/bin/python
'''
Created on 20.05.2015
@author: wn
'''
from MqttReceiver import MqttReceiver
from MongoWriter import MongoWriter
from MeterBusDecoder import MeterBusDecoder
from RePublisher import RePublisher
from MonitorPublisher import MonitorPublisher
from AwsPublish import AwsPublish
from Broker import Broker
import Queue
import os
import sys
from logger import Logger
import time
DEBUG = True
DEBUG_TO_STDOUT = True
BACKGROUND = False
PID_FILE = "/opt/logs/mqtt2mongo.pid"
LOG_FILE = "/opt/logs/mqtt2mongo.log"
MQTT_BROKER = "127.0.0.1"
TOPICS = ['IoT/Measurement/#', 'IoT/WiFiPowerMeter/Measurement']
MONGO_HOST = "127.0.0.1"
MONGO_DATABASE = "iot"
MONGO_COLLECTION = "iot"
if BACKGROUND:
pid = os.fork()
else:
pid = 0
if pid:
pidFile = file(PID_FILE , mode='w')
pidFile.write("%i\n" % pid)
pidFile.close()
sys.exit(0)
Logger.openlog(LOG_FILE)
if DEBUG:
Logger.debugEnable()
if DEBUG_TO_STDOUT:
Logger.debugToStdoutEnable()
Logger.log("mqtt2mongo starting")
try:
queue1 = Queue.Queue()
Logger.log("queues created ...")
mqttReader = MqttReceiver.create(queue1, MQTT_BROKER, TOPICS)
mqttReader.start()
Logger.log("MqttReader started ...")
broker = Broker()
broker.start()
queue2 = broker.getInQueue()
Logger.log("Broker started ...")
meterBusDecoder = MeterBusDecoder(queue1, queue2)
meterBusDecoder.start()
Logger.log("MeterBusDecoder started ...")
queue3 = broker.subscribe('mongoWriter')
mongoWriter = MongoWriter(queue3, MONGO_HOST, MONGO_DATABASE, MONGO_COLLECTION)
mongoWriter.start()
Logger.log("MongoWriter started ...")
queue4 = broker.subscribe('republisher')
republisher = RePublisher(queue4, MQTT_BROKER)
republisher.start()
Logger.log("RePublisher started ...")
queue5 = broker.subscribe('monitorpublisher')
monitorpublisher = MonitorPublisher(queue5, MQTT_BROKER)
monitorpublisher.start()
Logger.log("MonitorPublisher started ...")
queue6 = broker.subscribe('awspublish')
awspublish = RePublisher(queue6)
awspublish.start()
Logger.log("AwsPublish started ...")
Logger.log("mqtt2mongo running")
while (True):
time.sleep(10)
except Exception, e:
Logger.log("mqtt2mongo dying from %s, %s" % (str(e), e.msg))
finally:
Logger.log("mqtt2mongo terminating ...")