From 43cf7396c1c9075bd35300e8f4321fe81490ef46 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Mon, 11 Jul 2016 21:20:22 +0200 Subject: [PATCH] fix in MonitorPublisher --- MonitorPublisher.py | 64 +++++++++++++++++++++++++-------------------- mqtt2mongo.py | 6 +++++ 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/MonitorPublisher.py b/MonitorPublisher.py index f57d857..b1db483 100644 --- a/MonitorPublisher.py +++ b/MonitorPublisher.py @@ -1,18 +1,10 @@ import threading import paho.mqtt.client as mqtt from logger import Logger -import json -import datetime -from time import mktime, localtime -class MyEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, datetime.datetime): - return str(obj) - return json.JSONEncoder.default(self, obj) class MonitorPublisher(threading.Thread): def __init__(self, queue, broker): @@ -21,35 +13,49 @@ class MonitorPublisher(threading.Thread): self.broker = broker self.setDaemon(True) + self.nameMap = { + 'light': { 'oldbody':'', 'body':'', 'slot': 1, 'header': 'Licht', 'cnt': 0 }, + 'computer': { 'oldbody':'', 'body':'', 'slot': 2, 'header': 'Computer', 'cnt': 0 }, + 'laundry': { 'oldbody':'', 'body':'', 'slot': 3, 'header': 'Waschm.', 'cnt': 0 }, + 'dryer': { 'oldbody':'', 'body':'', 'slot': 4, 'header': 'Trockner', 'cnt': 0 }, + 'dishwasher': { 'oldbody':'', 'body':'', 'slot': 5, 'header': 'Spuelm.', 'cnt': 0 }, + 'freezer': { 'oldbody':'', 'body':'', 'slot': 6, 'header': 'Gefrier.', 'cnt': 0 }, + 'electricity': { 'oldbody':'', 'body':'', 'slot': 7, 'header': 'Strom', 'cnt': 0 }, + 'thermom.': { 'oldbody':'', 'body':'', 'slot': 8, 'header': 'Temp.', 'cnt': 0 }, + 'fridge_x': { 'oldbody':'', 'body':'', 'slot': 9, 'header': 'Gefr.T.', 'cnt': 0 }, + 'os_x': { 'oldbody':'', 'body':'', 'slot': 10, 'header': 'Server', 'cnt': 0 }, + } + def run(self): client = mqtt.Client() client.connect(self.broker, 1883, 60) client.loop_start() - lastEvent = 0.0 while True: try: msg = self.queue.get() + dataBlock = msg['data'] + metadataBlock = msg['metadata'] + if 'Slave' in metadataBlock: + if metadataBlock['Slave'] == 'Thermometer' and metadataBlock['device'] == 'ModbusHub': + metadataBlock['name'] = 'fridge' + name = metadataBlock['name'] + Logger.log("Name: " + name) + if name in self.nameMap: + if name in ['light', 'computer', 'laundry', 'dryer', 'dishwasher', 'freezer', 'electricity']: + self.nameMap[name]['body'] = str(dataBlock['power']) + elif name == 'thermom.': + self.nameMap[name]['body'] = "{0:.2f}".format(dataBlock['temperature2']) - monitorMessages = [ - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'], - ['header1', 'body1'] - ] - currentTime = mktime(localtime()) - if (currentTime > lastEvent + 60): - lastEvent = currentTime - client.publish("IoT/Message/Monitor", monitorMsg) - Logger.log("MonitorPublisher has sent data") + if (self.nameMap[name]['oldbody'] != self.nameMap[name]['body']) or (self.nameMap[name]['cnt'] == 10): + self.nameMap[name]['cnt'] = 0; + self.nameMap[name]['oldbody'] = self.nameMap[name]['body'] + message = str(self.nameMap[name]['slot']) + " " + self.nameMap[name]['header'] + " " + self.nameMap[name]['body'] + client.publish("IoT/Monitor/Message", message) + Logger.log("MonitorPublisher has sent: " + message) + else: + self.nameMap[name]['cnt'] += 1 + else: + Logger.log("unknown name: " + name) except Exception, e: Logger.log("Unexcepted exception %s in MonitorPublisher: %s" % (e.__class__.__name__, str(e))) diff --git a/mqtt2mongo.py b/mqtt2mongo.py index 6b608b9..a28973e 100644 --- a/mqtt2mongo.py +++ b/mqtt2mongo.py @@ -8,6 +8,7 @@ from MqttReceiver import MqttReceiver from MongoWriter import MongoWriter from MeterBusDecoder import MeterBusDecoder from RePublisher import RePublisher +from MonitorPublisher import MonitorPublisher from Broker import Broker import Queue import os @@ -73,6 +74,11 @@ try: republisher.start() Logger.log("RePublisher started ...") + queue5 = broker.subscribe('monitorpublisher') + monitorpublisher = MonitorPublisher(queue5, MQTT_BROKER) + monitorpublisher.start() + Logger.log("MonitorPublisher started ...") + Logger.log("mqtt2mongo running")