25 Commits

Author SHA1 Message Date
d9064f3527 add AwsPublish 2017-07-18 21:23:19 +02:00
6884d44bb8 name in monitor message 2016-12-21 12:47:14 +01:00
6d415e481c some changes 2016-10-31 16:27:39 +01:00
ebb18e3d66 add temperature for whatever 2016-10-23 13:43:30 +02:00
43cf7396c1 fix in MonitorPublisher 2016-07-11 21:20:22 +02:00
42259f991c change addresses 2016-07-09 22:48:26 +02:00
814871b481 new 2016-07-03 23:15:21 +02:00
887b4f4abf fix mqtt topic 2016-07-03 22:40:56 +02:00
ea4b765e85 disabled debugging 2016-07-03 22:28:58 +02:00
d3aa892f37 add local broker and republisher 2016-07-03 22:27:58 +02:00
6c6f6c7e62 merged 2016-06-25 15:46:27 +02:00
61bd19ee85 changes 2016-06-25 15:44:13 +02:00
0615be568d add broker class 2016-03-06 22:27:06 +01:00
348cb7effa changes 2015-12-26 22:11:02 +01:00
eed0bc008d changechangess 2015-08-30 15:50:01 +02:00
bad6dec7bd large expection in MeterBusDecoder 2015-06-26 11:52:22 +02:00
hg
d9579748c6 add meterbus stuff 2015-06-15 22:28:31 +02:00
167e1486f1 merged 2015-06-13 22:07:51 +02:00
ba50897ac1 new 2015-06-13 22:06:10 +02:00
23ea5a10f8 new 2015-06-13 22:05:41 +02:00
hg
b5950704db prepared meterbus decoding 2015-06-09 22:33:02 +02:00
60c2292718 other database 2015-06-09 22:13:26 +02:00
hg
d851a9f5dc threads and queues 2015-06-09 22:09:28 +02:00
hg
ea0c98b4c5 new, refactoring 2015-06-09 17:01:20 +02:00
d52db3c576 Added tag WORKS1 for changeset 6a96e36a79d7 2015-06-09 16:40:38 +02:00
14 changed files with 627 additions and 25 deletions

3
.hgignore Normal file
View File

@ -0,0 +1,3 @@
syntax: glob
*.pyc

1
.hgtags Normal file
View File

@ -0,0 +1 @@
6a96e36a79d7497b5dc213cc6f2ea6e106ce2d88 WORKS1

View File

@ -2,7 +2,11 @@
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/${PROJECT_DIR_NAME}</path>
<path>/Mqtt2Mongo</path>
</pydev_pathproperty>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_EXTERNAL_SOURCE_PATH">
<path>${WORKSPACE_LOC}/MeterbusLib</path>
</pydev_pathproperty>
</pydev_project>

53
AwsPublish.py Normal file
View File

@ -0,0 +1,53 @@
import threading
import paho.mqtt.client as mqtt
from logger import Logger
import json
import datetime
from time import mktime
CA_CERTS = "VeriSign-Class 3-Public-Primary-Certification-Authority-G5.pem"
CERT_FILE = "aws-certificate.pem.crt"
KEY_FILE = "aws-private.pem.key"
MQTT_HOST = "a86hx9xnv9yty.iot.us-west-2.amazonaws.com"
MQTT_PORT = 8883
CLIENT_ID = "d0bf9c206e14x"
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)
class AwsPublish(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.setDaemon(True)
def run(self):
client = mqtt.Client()
client.tls_set(ca_certs=CA_CERTS, certfile=CERT_FILE, keyfile=KEY_FILE)
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_start()
while True:
try:
msg = self.queue.get()
dataBlock = msg['data']
if 'decodedTelegram' in dataBlock:
del dataBlock['decodedTelegram']
if 'telegram' in dataBlock:
del dataBlock['telegram']
metadataBlock = msg['metadata']
if 'Slave' in metadataBlock:
if metadataBlock['Slave'] == 'Thermometer' and metadataBlock['device'] == 'ModbusHub':
metadataBlock['name'] = 'FridgeThermometer'
topic = "IoT/ParsedData/%s" % metadataBlock['name']
client.publish(topic, json.dumps(msg, cls=MyEncoder))
Logger.log("RePublisher has sent data")
except Exception, e:
Logger.log("Unexcepted exception %s in RePublisher: %s" % (e.__class__.__name__, str(e)))

55
Broker.py Normal file
View File

@ -0,0 +1,55 @@
'''
Created on 06.03.2016
@author: wn
'''
import Queue
import threading
from logger import Logger
class BrokerException(Exception): pass
class BrokerDuplicateIdException(BrokerException): pass
class BrokerNotSubscribedException(BrokerException): pass
class BrokerOverflowException(BrokerException): pass
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.setDaemon(True)
self.outQueues = {}
self.inQueue = Queue.Queue()
def run(self):
while True:
try:
item = self.inQueue.get(True, None)
Logger.log("got an item")
for (n, q) in self.outQueues.iteritems():
Logger.log("passed to %s" % n)
q.put_nowait(item)
except Queue.Full:
pass
def getInQueue(self):
return self.inQueue
def subscribe(self, name):
if self.outQueues.has_key(name):
raise BrokerDuplicateIdException()
self.outQueues[name] = Queue.Queue()
return self.outQueues[name]
def unsubscribe(self, name):
try:
del self.outQueues[name]
except KeyError:
raise BrokerNotSubscribedException

117
MeterBusDecoder.py Normal file
View File

@ -0,0 +1,117 @@
'''
Created on 09.06.2015
@author: wn
'''
import threading
from logger import Logger
import Queue
import json
import MeterbusLib
from MeterbusLibExceptions import MeterbusLibException
OnePhaseElectric_Finder, ThreePhaseElectric_Finder, Thermometer_Hottis, Watermeter_Hyd = 'OnePhaseElectric_Finder', 'ThreePhaseElectric_Finder', 'Thermometer_Hottis', 'Watermeter_Hyd'
devices = []
device_dishwasher_electric = MeterbusLib.Device(0x53, OnePhaseElectric_Finder, "Dishwasher", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_dishwasher_electric)
device_computer_electric = MeterbusLib.Device(85, OnePhaseElectric_Finder, "Computer", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_computer_electric)
device_freezer_electric = MeterbusLib.Device(86, OnePhaseElectric_Finder, "Freezer", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_freezer_electric)
device_laundry_electric = MeterbusLib.Device(82, OnePhaseElectric_Finder, "Laundry", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_laundry_electric)
device_dryer_electric = MeterbusLib.Device(81, OnePhaseElectric_Finder, "Dryer", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_dryer_electric)
device_light_electric = MeterbusLib.Device(84, OnePhaseElectric_Finder, "Light", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_light_electric)
device_3phase_electric = MeterbusLib.Device(0x50, ThreePhaseElectric_Finder, "3 Phase Electric", ["Energy T1 total", "Energy T1 partial", "Energy T2 total", "Energy T2 partial",
"Voltage phase 1", "Current phase 1", "Power phase 1", "img. Power phase 1",
"Voltage phase 2", "Current phase 2", "Power phase 2", "img. Power phase 2",
"Voltage phase 3", "Current phase 3", "Power phase 3", "img. Power phase 3",
"converter ratio", "Power total", "img. Power total", "tariff"
])
devices.append(device_3phase_electric)
device_thermometer = MeterbusLib.Device(0x21, Thermometer_Hottis, "Thermometer", ["Uptime Seconds", "Uptime Minutes", "Uptime Hours", "Uptime Days",
"Temperature 1", "Temperature 2", "Temperature 3", "Temperature 4",
"rawdata"
])
devices.append(device_thermometer)
device_watermeter = MeterbusLib.Device(0x30, Watermeter_Hyd, "Watermeter", ["Volume", "Minimum Volume", "Volume Flow", "OpTime", "Flow Temperature", "TimePoint1", "Volume2", "tariff1", "TimePoint2", "TimePoint3"])
devices.append(device_watermeter)
class MeterBusDecoder(threading.Thread):
def __init__(self, inQueue, outQueue):
threading.Thread.__init__(self)
self.inQueue = inQueue
self.outQueue = outQueue
self.setDaemon(True)
def run(self):
while True:
msg = self.inQueue.get()
Logger.log("MeterBusDecoder is doing something with %s" % msg)
try:
mbusPayload = msg['data']['telegram']
if mbusPayload[-1] == ' ':
mbusPayload2 = mbusPayload[:-1]
else:
mbusPayload2 = mbusPayload
Logger.log("<" + mbusPayload2 + ">")
telegram = MeterbusLib.Telegram(devices)
telegram.fromHexString(mbusPayload2)
telegram.parse()
Logger.log(json.dumps(telegram.getJSON(), indent=2))
msg['metadata']['consumer'] = telegram.frame.comment
msg['metadata']['category'] = telegram.frame.category
if telegram.frame.category == OnePhaseElectric_Finder:
msg['data']['current'] = telegram.frame.dib[3].value
msg['data']['power'] = telegram.frame.dib[4].value
msg['data']['energy'] = telegram.frame.dib[0].value
elif telegram.frame.category == ThreePhaseElectric_Finder:
msg['data']['current'] = telegram.frame.dib[5].value + telegram.frame.dib[9].value + telegram.frame.dib[13].value
msg['data']['power'] = telegram.frame.dib[17].value
msg['data']['energy'] = telegram.frame.dib[0].value
elif telegram.frame.category == Watermeter_Hyd:
msg['data']['volume'] = telegram.frame.dib[0].value
msg['data']['flow'] = telegram.frame.dib[2].value
msg['data']['temperature'] = telegram.frame.dib[4].value
elif telegram.frame.category == Thermometer_Hottis:
msg['data']['temperature1'] = telegram.frame.dib[4].value
msg['data']['temperature2'] = telegram.frame.dib[5].value
msg['data']['temperature3'] = telegram.frame.dib[6].value
msg['data']['temperature4'] = telegram.frame.dib[7].value
msg['data']['operatingTime'] = "%i-%i:%i:%i" % (telegram.frame.dib[3].value, telegram.frame.dib[2].value, telegram.frame.dib[1].value, telegram.frame.dib[0].value)
msg['metadata']['version'] = 2
msg['data']['decodedTelegram'] = telegram.getJSON()
Logger.log(msg)
except KeyError, e:
Logger.log("MeterBusDecoder, parse, KeyError: %s" % e)
except MeterbusLibException, e:
Logger.log("MeterBusDecoder, parse, MeterbusLibException: %s %s" % (e.__class__.__name__,e))
except Exception, e:
Logger.log("MeterBusDecoder, parse, Exception: %s %s" % (e.__class__.__name__,e))
try:
self.outQueue.put_nowait(msg)
except Queue.Full:
Logger.log("MeterBusDecoder, put queue overrun, drop result of %s" % str(msg))
except Exception, e:
Logger.log("MeterBusDecoder, put Exception: %s %s" % (e.__class__.__name__,e))

36
MongoWriter.py Normal file
View File

@ -0,0 +1,36 @@
'''
Created on 09.06.2015
@author: wn
'''
import threading
from logger import Logger
import pymongo
class MongoWriter(threading.Thread):
def __init__(self, queue, dbhost, database, collection):
threading.Thread.__init__(self)
self.queue = queue
self.dbhost = dbhost
self.database = database
self.collection = collection
self.setDaemon(True)
def run(self):
while True:
try:
msg = self.queue.get()
# Logger.debug("MongoWriter receives: %s" % msg)
mongoClient = pymongo.MongoClient(self.dbhost)
db = mongoClient[self.database]
res = db[self.collection].insert_one(msg)
Logger.log("MongoWriter inserts: %s" % res.inserted_id)
except pymongo.errors.ServerSelectionTimeoutError, e:
Logger.log("Exception %s in MongoWriter, run" % str(e))
Logger.log("Msg dropped: %s" % msg)
except TypeError, e:
Logger.log("Exception %s in MongoWriter, run" % str(e))
except Exception, e:
Logger.log("Unexcepted exception %s in MongoWriter: %s" % (e.__class__.__name__, str(e)))

63
MonitorPublisher.py Normal file
View File

@ -0,0 +1,63 @@
import threading
import paho.mqtt.client as mqtt
from logger import Logger
class MonitorPublisher(threading.Thread):
def __init__(self, queue, broker):
threading.Thread.__init__(self)
self.queue = queue
self.broker = broker
self.setDaemon(True)
self.nameMap = {
'light': { 'oldbody':'', 'body':'', 'slot': 1, 'header': 'Licht', 'cnt': 0 },
'computer': { 'oldbody':'', 'body':'', 'slot': 2, 'header': 'Computer', 'cnt': 0 },
'laundry': { 'oldbody':'', 'body':'', 'slot': 3, 'header': 'Waschm.', 'cnt': 0 },
'dryer': { 'oldbody':'', 'body':'', 'slot': 4, 'header': 'Trockner', 'cnt': 0 },
'dishwasher': { 'oldbody':'', 'body':'', 'slot': 5, 'header': 'Spuelm.', 'cnt': 0 },
'freezer': { 'oldbody':'', 'body':'', 'slot': 6, 'header': 'Gefrier.', 'cnt': 0 },
'electricity': { 'oldbody':'', 'body':'', 'slot': 7, 'header': 'Strom', 'cnt': 0 },
'thermom.': { 'oldbody':'', 'body':'', 'slot': 8, 'header': 'Temp.', 'cnt': 0 },
'fridge': { 'oldbody':'', 'body':'', 'slot': 9, 'header': 'Gefr.T.', 'cnt': 0 },
'os_x': { 'oldbody':'', 'body':'', 'slot': 10, 'header': 'Server', 'cnt': 0 },
}
def run(self):
client = mqtt.Client()
client.connect(self.broker, 1883, 60)
client.loop_start()
while True:
try:
msg = self.queue.get()
dataBlock = msg['data']
metadataBlock = msg['metadata']
if 'Slave' in metadataBlock:
if metadataBlock['Slave'] == 'Thermometer' and metadataBlock['device'] == 'ModbusHub':
metadataBlock['name'] = 'fridge'
name = metadataBlock['name']
Logger.log("Name: " + name)
if name in self.nameMap:
if name in ['light', 'computer', 'laundry', 'dryer', 'dishwasher', 'freezer', 'electricity']:
self.nameMap[name]['body'] = str(dataBlock['power'])
elif name == 'thermom.':
self.nameMap[name]['body'] = "{0:.2f}".format(dataBlock['temperature2'])
elif name == 'fridge':
self.nameMap[name]['body'] = "{0:.2f}".format(dataBlock['t2'])
if (self.nameMap[name]['oldbody'] != self.nameMap[name]['body']) or (self.nameMap[name]['cnt'] == 10):
self.nameMap[name]['cnt'] = 0;
self.nameMap[name]['oldbody'] = self.nameMap[name]['body']
message = str(self.nameMap[name]['slot']) + " " + self.nameMap[name]['header'] + " " + self.nameMap[name]['body']
client.publish("IoT/Monitor/Message/" + name, message, retain=True)
Logger.log("MonitorPublisher has sent: " + message)
else:
self.nameMap[name]['cnt'] += 1
else:
Logger.log("unknown name: " + name)
except Exception, e:
Logger.log("Unexcepted exception %s in MonitorPublisher: %s" % (e.__class__.__name__, str(e)))

62
MqttReceiver.py Normal file
View File

@ -0,0 +1,62 @@
'''
Created on 09.06.2015
@author: wn
'''
import threading
import Queue
from logger import Logger
import paho.mqtt.client as mqtt
import json
import datetime
_queue = None
def on_message(client, userdata, msg):
try:
j = json.loads(msg.payload)
now1 = datetime.datetime.now()
now = now1.replace(now1.year, now1.month, now1.day, now1.hour, now1.minute, now1.second, 0)
midnight = now.replace(now.year, now.month, now.day, 0,0,0,0)
seconds = (now - midnight).seconds
j['metadata']['timestamp'] = now
j['metadata']['seconds'] = seconds
j['metadata']['day'] = midnight
Logger.debug("MqttReceiver queues: %s" % j)
_queue.put_nowait(j)
except Queue.Full:
Logger.log("Message %s dropped" % (j))
except ValueError, e:
Logger.log("Exception %s in MqttReceiver, on_message" % (str(e)))
class MqttReceiver(threading.Thread):
singleton = None
@classmethod
def create(cls, queue, broker, topics):
global _queue
if cls.singleton is None:
cls.singleton = MqttReceiver(queue, broker, topics)
_queue = queue
return cls.singleton
def __init__(self, queue, broker, topics):
threading.Thread.__init__(self)
self.queue = queue
self.broker = broker
self.topics = topics
self.setDaemon(True)
def run(self):
client = mqtt.Client()
client.on_message = on_message
client.connect(self.broker, 1883, 60)
for topic in self.topics:
Logger.log("Subscribing on %s" % str(topic))
client.subscribe(topic)
client.loop_forever()

45
RePublisher.py Normal file
View File

@ -0,0 +1,45 @@
import threading
import paho.mqtt.client as mqtt
from logger import Logger
import json
import datetime
from time import mktime
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)
class RePublisher(threading.Thread):
def __init__(self, queue, broker):
threading.Thread.__init__(self)
self.queue = queue
self.broker = broker
self.setDaemon(True)
def run(self):
client = mqtt.Client()
client.connect(self.broker, 1883, 60)
client.loop_start()
while True:
try:
msg = self.queue.get()
dataBlock = msg['data']
if 'decodedTelegram' in dataBlock:
del dataBlock['decodedTelegram']
if 'telegram' in dataBlock:
del dataBlock['telegram']
metadataBlock = msg['metadata']
if 'Slave' in metadataBlock:
if metadataBlock['Slave'] == 'Thermometer' and metadataBlock['device'] == 'ModbusHub':
metadataBlock['name'] = 'FridgeThermometer'
topic = "IoT/ParsedData/%s" % metadataBlock['name']
client.publish(topic, json.dumps(msg, cls=MyEncoder))
Logger.log("RePublisher has sent data")
except Exception, e:
Logger.log("Unexcepted exception %s in RePublisher: %s" % (e.__class__.__name__, str(e)))

36
logger.py Normal file
View File

@ -0,0 +1,36 @@
from time import gmtime, strftime
class Logger(object):
debugFlag = False
debugToStdoutFlag = False
@classmethod
def log(cls, data):
t = strftime("%d %b %Y %H:%M:%S", gmtime())
with open(cls.logfile, 'a') as f:
f.write("%s %s\n" % (t, data))
if cls.debugFlag and cls.debugToStdoutFlag:
print data
@classmethod
def debug(cls, data):
if cls.debugFlag:
cls.log(data)
@classmethod
def debugEnable(cls):
cls.debugFlag = True
@classmethod
def debugToStdoutEnable(cls):
cls.debugToStdoutFlag = True
@classmethod
def debugDisable(cls):
cls.debugFlag = False
@classmethod
def openlog(cls, logfile):
cls.logfile = logfile

61
mqtt-mbus2console.py Normal file
View File

@ -0,0 +1,61 @@
'''
Created on 20.05.2015
@author: wn
'''
import paho.mqtt.client as mqtt
import json
import pymongo
import datetime
import sys
import MeterbusLib
devices = []
device_dishwasher_electric = MeterbusLib.Device(0x53, "Dishwasher", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_dishwasher_electric)
device_laundry_electric = MeterbusLib.Device(82, "Laundry", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_laundry_electric)
device_dryer_electric = MeterbusLib.Device(81, "Dryer", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_dryer_electric)
device_light_electric = MeterbusLib.Device(84, "Light", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_light_electric)
device_3phase_electric = MeterbusLib.Device(0x50, "3 Phase Electric", ["Energy T1 total", "Energy T1 partial", "Energy T2 total", "Energy T2 partial",
"Voltage phase 1", "Current phase 1", "Power phase 1", "img. Power phase 1",
"Voltage phase 2", "Current phase 2", "Power phase 2", "img. Power phase 2",
"Voltage phase 3", "Current phase 3", "Power phase 3", "img. Power phase 3",
"converter ratio", "Power total", "img. Power total", "tariff"
])
devices.append(device_3phase_electric)
device_thermometer = MeterbusLib.Device(0x21, "Thermometer", ["Uptime Seconds", "Uptime Minutes", "Uptime Hours", "Uptime Days",
"Temperature 1", "Temperature 2", "Temperature 3", "Temperature 4",
"rawdata"
])
devices.append(device_thermometer)
def on_message(client, userdata, msg):
j = json.loads(msg.payload)
# print(json.dumps(j, indent=2, separators=(',',':')))
mbusPayload = j['data']['telegram']
if mbusPayload[-1] == ' ':
mbusPayload2 = mbusPayload[:-1]
else:
mbusPayload2 = mbusPayload
print("<" + mbusPayload2 + ">")
telegram = MeterbusLib.Telegram(devices)
telegram.fromHexString(mbusPayload2)
telegram.parse()
print(json.dumps(telegram.getJSON(), indent=2))
if __name__ == '__main__':
print("Starting client ...")
client = mqtt.Client()
client.on_message = on_message
client.connect("mqttbroker", 1883, 60)
client.subscribe("IoT/Measurement/MeterbusHub")
client.loop_forever()

View File

@ -20,7 +20,7 @@ if __name__ == '__main__':
client = mqtt.Client()
client.on_message = on_message
client.connect("mqttbroker", 1883, 60)
client.connect("172.16.2.15", 1883, 60)
for i in sys.argv[1:]:
client.subscribe(i)

114
mqtt2mongo.py Normal file → Executable file
View File

@ -1,36 +1,102 @@
#!/usr/bin/python
'''
Created on 20.05.2015
@author: wn
'''
import paho.mqtt.client as mqtt
import json
import pymongo
import datetime
from MqttReceiver import MqttReceiver
from MongoWriter import MongoWriter
from MeterBusDecoder import MeterBusDecoder
from RePublisher import RePublisher
from MonitorPublisher import MonitorPublisher
from AwsPublish import AwsPublish
from Broker import Broker
import Queue
import os
import sys
from logger import Logger
import time
def on_message(client, userdata, msg):
j = json.loads(msg.payload)
now = datetime.datetime.now()
midnight = now.replace(now.year, now.month, now.day, 0,0,0,0)
seconds = (now - midnight).seconds
j['metadata']['timestamp'] = datetime.datetime.now()
j['metadata']['seconds'] = seconds
j['metadata']['day'] = midnight
print(j)
mongoClient = pymongo.MongoClient('localhost')
db = mongoClient.iot
r = db.iot.insert_one(j)
DEBUG = True
DEBUG_TO_STDOUT = True
BACKGROUND = False
PID_FILE = "/opt/logs/mqtt2mongo.pid"
LOG_FILE = "/opt/logs/mqtt2mongo.log"
MQTT_BROKER = "127.0.0.1"
TOPICS = ['IoT/Measurement/#', 'IoT/WiFiPowerMeter/Measurement']
MONGO_HOST = "127.0.0.1"
MONGO_DATABASE = "iot"
MONGO_COLLECTION = "iot"
if __name__ == '__main__':
print("Starting client ...")
if BACKGROUND:
pid = os.fork()
else:
pid = 0
client = mqtt.Client()
client.on_message = on_message
client.connect("mqttbroker", 1883, 60)
client.subscribe("IoT/Measurement/#")
client.subscribe("IoT/WiFiPowerMeter/Measurement")
if pid:
pidFile = file(PID_FILE , mode='w')
pidFile.write("%i\n" % pid)
pidFile.close()
sys.exit(0)
client.loop_forever()
Logger.openlog(LOG_FILE)
if DEBUG:
Logger.debugEnable()
if DEBUG_TO_STDOUT:
Logger.debugToStdoutEnable()
Logger.log("mqtt2mongo starting")
try:
queue1 = Queue.Queue()
Logger.log("queues created ...")
mqttReader = MqttReceiver.create(queue1, MQTT_BROKER, TOPICS)
mqttReader.start()
Logger.log("MqttReader started ...")
broker = Broker()
broker.start()
queue2 = broker.getInQueue()
Logger.log("Broker started ...")
meterBusDecoder = MeterBusDecoder(queue1, queue2)
meterBusDecoder.start()
Logger.log("MeterBusDecoder started ...")
queue3 = broker.subscribe('mongoWriter')
mongoWriter = MongoWriter(queue3, MONGO_HOST, MONGO_DATABASE, MONGO_COLLECTION)
mongoWriter.start()
Logger.log("MongoWriter started ...")
queue4 = broker.subscribe('republisher')
republisher = RePublisher(queue4, MQTT_BROKER)
republisher.start()
Logger.log("RePublisher started ...")
queue5 = broker.subscribe('monitorpublisher')
monitorpublisher = MonitorPublisher(queue5, MQTT_BROKER)
monitorpublisher.start()
Logger.log("MonitorPublisher started ...")
queue6 = broker.subscribe('awspublish')
awspublish = RePublisher(queue6)
awspublish.start()
Logger.log("AwsPublish started ...")
Logger.log("mqtt2mongo running")
while (True):
time.sleep(10)
except Exception, e:
Logger.log("mqtt2mongo dying from %s, %s" % (str(e), e.msg))
finally:
Logger.log("mqtt2mongo terminating ...")