add local broker and republisher

This commit is contained in:
Wolfgang Hottgenroth 2016-07-03 22:27:58 +02:00
parent 6c6f6c7e62
commit d3aa892f37
3 changed files with 82 additions and 22 deletions

View File

@ -6,6 +6,9 @@ Created on 06.03.2016
import Queue import Queue
import threading
from logger import Logger
class BrokerException(Exception): pass class BrokerException(Exception): pass
@ -16,15 +19,33 @@ class BrokerNotSubscribedException(BrokerException): pass
class BrokerOverflowException(BrokerException): pass class BrokerOverflowException(BrokerException): pass
class Broker(object):
class Broker(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self)
self.setDaemon(True)
self.outQueues = {} self.outQueues = {}
self.inQueue = Queue.Queue self.inQueue = Queue.Queue()
def run(self):
while True:
try:
item = self.inQueue.get(True, None)
Logger.log("got an item")
for (n, q) in self.outQueues.iteritems():
Logger.log("passed to %s" % n)
q.put_nowait(item)
except Queue.Full:
pass
def getInQueue(self):
return self.inQueue
def subscribe(self, name): def subscribe(self, name):
if self.outQueues.has_key(name): if self.outQueues.has_key(name):
raise BrokerDuplicateIdException() raise BrokerDuplicateIdException()
self.outQueues[name] = Queue.Queue() self.outQueues[name] = Queue.Queue()
return self.outQueues[name]
def unsubscribe(self, name): def unsubscribe(self, name):
try: try:
@ -32,16 +53,3 @@ class Broker(object):
except KeyError: except KeyError:
raise BrokerNotSubscribedException raise BrokerNotSubscribedException
def get(self, name):
try:
return self.outQueues[name].get()
except KeyError:
raise BrokerNotSubscribedException
def put(self, msg):
try:
for q in self.outQueues.values():
q.put_nowait(msg)
except Queue.Full:

40
RePublisher.py Normal file
View File

@ -0,0 +1,40 @@
import threading
import paho.mqtt.client as mqtt
from logger import Logger
import json
import datetime
from time import mktime
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)
class RePublisher(threading.Thread):
def __init__(self, queue, broker):
threading.Thread.__init__(self)
self.queue = queue
self.broker = broker
self.setDaemon(True)
def run(self):
client = mqtt.Client()
client.connect(self.broker, 1883, 60)
client.loop_start()
while True:
try:
msg = self.queue.get()
dataBlock = msg['data']
if 'decodedTelegram' in dataBlock:
del dataBlock['decodedTelegram']
if 'telegram' in dataBlock:
del dataBlock['telegram']
client.publish("IoT/ParsedData", json.dumps(msg, cls=MyEncoder))
Logger.log("RePublisher has sent data")
except Exception, e:
Logger.log("Unexcepted exception %s in RePublisher: %s" % (e.__class__.__name__, str(e)))

View File

@ -7,6 +7,8 @@ Created on 20.05.2015
from MqttReceiver import MqttReceiver from MqttReceiver import MqttReceiver
from MongoWriter import MongoWriter from MongoWriter import MongoWriter
from MeterBusDecoder import MeterBusDecoder from MeterBusDecoder import MeterBusDecoder
from RePublisher import RePublisher
from Broker import Broker
import Queue import Queue
import os import os
import sys import sys
@ -15,11 +17,11 @@ import time
DEBUG = True DEBUG = True
BACKGROUND = False BACKGROUND = False
PID_FILE = "/tmp/mqtt2mongo.pid" PID_FILE = "/opt/logs/mqtt2mongo.pid"
LOG_FILE = "/tmp/mqtt2mongo.log" LOG_FILE = "/opt/logs/mqtt2mongo.log"
BROKER = "172.16.2.15" MQTT_BROKER = "172.16.2.15"
TOPICS = ['IoT/Measurement/#', 'IoT/WiFiPowerMeter/Measurement'] TOPICS = ['IoT/Measurement/#', 'IoT/WiFiPowerMeter/Measurement']
MONGO_HOST = "172.16.2.18" MONGO_HOST = "172.16.2.16"
MONGO_DATABASE = "iot" MONGO_DATABASE = "iot"
MONGO_COLLECTION = "iot" MONGO_COLLECTION = "iot"
@ -46,21 +48,31 @@ Logger.log("mqtt2mongo starting")
try: try:
queue1 = Queue.Queue() queue1 = Queue.Queue()
queue2 = Queue.Queue()
Logger.log("queues created ...") Logger.log("queues created ...")
mqttReader = MqttReceiver.create(queue1, BROKER, TOPICS) mqttReader = MqttReceiver.create(queue1, MQTT_BROKER, TOPICS)
mqttReader.start() mqttReader.start()
Logger.log("MqttReader started ...") Logger.log("MqttReader started ...")
broker = Broker()
broker.start()
queue2 = broker.getInQueue()
Logger.log("Broker started ...")
meterBusDecoder = MeterBusDecoder(queue1, queue2) meterBusDecoder = MeterBusDecoder(queue1, queue2)
meterBusDecoder.start() meterBusDecoder.start()
Logger.log("MeterBusDecoder started ...") Logger.log("MeterBusDecoder started ...")
mongoWriter = MongoWriter(queue2, MONGO_HOST, MONGO_DATABASE, MONGO_COLLECTION) queue3 = broker.subscribe('mongoWriter')
mongoWriter = MongoWriter(queue3, MONGO_HOST, MONGO_DATABASE, MONGO_COLLECTION)
mongoWriter.start() mongoWriter.start()
Logger.log("MongoWriter started ...") Logger.log("MongoWriter started ...")
queue4 = broker.subscribe('republisher')
republisher = RePublisher(queue4, MQTT_BROKER)
republisher.start()
Logger.log("RePublisher started ...")
Logger.log("mqtt2mongo running") Logger.log("mqtt2mongo running")