add AwsPublish

This commit is contained in:
Wolfgang Hottgenroth 2017-07-18 21:23:19 +02:00
parent 6884d44bb8
commit d9064f3527
3 changed files with 69 additions and 10 deletions

53
AwsPublish.py Normal file
View File

@ -0,0 +1,53 @@
import threading
import paho.mqtt.client as mqtt
from logger import Logger
import json
import datetime
from time import mktime
CA_CERTS = "VeriSign-Class 3-Public-Primary-Certification-Authority-G5.pem"
CERT_FILE = "aws-certificate.pem.crt"
KEY_FILE = "aws-private.pem.key"
MQTT_HOST = "a86hx9xnv9yty.iot.us-west-2.amazonaws.com"
MQTT_PORT = 8883
CLIENT_ID = "d0bf9c206e14x"
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)
class AwsPublish(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.setDaemon(True)
def run(self):
client = mqtt.Client()
client.tls_set(ca_certs=CA_CERTS, certfile=CERT_FILE, keyfile=KEY_FILE)
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_start()
while True:
try:
msg = self.queue.get()
dataBlock = msg['data']
if 'decodedTelegram' in dataBlock:
del dataBlock['decodedTelegram']
if 'telegram' in dataBlock:
del dataBlock['telegram']
metadataBlock = msg['metadata']
if 'Slave' in metadataBlock:
if metadataBlock['Slave'] == 'Thermometer' and metadataBlock['device'] == 'ModbusHub':
metadataBlock['name'] = 'FridgeThermometer'
topic = "IoT/ParsedData/%s" % metadataBlock['name']
client.publish(topic, json.dumps(msg, cls=MyEncoder))
Logger.log("RePublisher has sent data")
except Exception, e:
Logger.log("Unexcepted exception %s in RePublisher: %s" % (e.__class__.__name__, str(e)))

View File

@ -10,6 +10,7 @@ from MongoWriter import MongoWriter
from MeterBusDecoder import MeterBusDecoder from MeterBusDecoder import MeterBusDecoder
from RePublisher import RePublisher from RePublisher import RePublisher
from MonitorPublisher import MonitorPublisher from MonitorPublisher import MonitorPublisher
from AwsPublish import AwsPublish
from Broker import Broker from Broker import Broker
import Queue import Queue
import os import os
@ -85,6 +86,11 @@ try:
monitorpublisher.start() monitorpublisher.start()
Logger.log("MonitorPublisher started ...") Logger.log("MonitorPublisher started ...")
queue6 = broker.subscribe('awspublish')
awspublish = RePublisher(queue6)
awspublish.start()
Logger.log("AwsPublish started ...")
Logger.log("mqtt2mongo running") Logger.log("mqtt2mongo running")