mqtt-tls connection and database added

This commit is contained in:
Wolfgang Hottgenroth 2017-11-13 15:07:52 +01:00
parent eea141df19
commit e767b9f8a6
11 changed files with 164 additions and 46 deletions

View File

@ -1,9 +1,23 @@
from logger import Logger from logger import Logger
class AbstractDatabasePreparer(object): class AbstractDatabasePreparer(object):
def __init__(self, tablename): def __init__(self, tablename, itemName, timestamp):
self.tablename = tablename self.tablename = tablename
self.itemName = itemName
self.timestamp = timestamp
self.values = {} self.values = {}
def getInsertStatement(self): def getItemName(self):
return "table: %s " % (self.tablename) + str(self.values) return self.itemName
def getTableName(self):
return self.tablename
def getTimestamp(self):
return self.timestamp
def getValues(self):
return self.values
def getData(self):
return "table: %s, %s, %s: " % (self.tablename, self.itemName, self.timestamp) + str(self.values)

View File

@ -1,19 +1,21 @@
import threading import threading
from logger import Logger from logger import Logger
import MySQLdb
class DatabaseEngine(threading.Thread): class DatabaseEngine(threading.Thread):
singleton = None singleton = None
@classmethod @classmethod
def create(cls, queue, period): def create(cls, queue, config):
if cls.singleton is None: if cls.singleton is None:
cls.singleton = DatabaseEngine(queue, period) cls.singleton = DatabaseEngine(queue, config)
return cls.singleton return cls.singleton
def __init__(self, queue, period): def __init__(self, queue, config):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.event = threading.Event() self.event = threading.Event()
self.period = period self.config = config
self.queue = queue self.queue = queue
self.startTimer() self.startTimer()
self.setDaemon(True) self.setDaemon(True)
@ -22,7 +24,7 @@ class DatabaseEngine(threading.Thread):
self.event.set() self.event.set()
def startTimer(self): def startTimer(self):
self.timer = threading.Timer(self.period, self.event.set) self.timer = threading.Timer(self.config['period'], self.event.set)
self.timer.start() self.timer.start()
def run(self): def run(self):
@ -34,8 +36,31 @@ class DatabaseEngine(threading.Thread):
data = self.queue.get() data = self.queue.get()
if (type(data) != list): if (type(data) != list):
data = [data] data = [data]
db = MySQLdb.connect(host=self.config['host'], user=self.config['user'],
passwd=self.config['password'], db=self.config['db'])
cursor = db.cursor()
for dbo in data: for dbo in data:
Logger.log("DatabaseEngine receives: %s" % (str(dbo.getInsertStatement()))) Logger.log("DatabaseEngine receives: %s" % (str(dbo.getData())))
columns = dbo.getValues()
columns['name'] = dbo.getItemName()
columns['ts'] = dbo.getTimestamp().isoformat()
columnNames = '(' + ','.join(columns.keys()) + ')'
valuePlaceholders = '(' + ','.join(['%s' for v in columns.values()]) + ')'
#Logger.log("columns: %s" % columnNames)
#Logger.log("placeholders: %s" % valuePlaceholders)
#Logger.log("values: %s" % columns.values())
stmt = 'INSERT INTO {0} {1} VALUES {2}'.format(dbo.getTableName(), columnNames, valuePlaceholders)
#Logger.log("stmt: %s" % (stmt))
cursor.execute(stmt, columns.values())
Logger.log("stmt executed")
db.commit()
db.close()
else: else:
Logger.log("DatabaseEngine: no more data to handle") Logger.log("DatabaseEngine: no more data to handle")

View File

@ -1,7 +1,7 @@
from AbstractDatabasePreparer import AbstractDatabasePreparer from AbstractDatabasePreparer import AbstractDatabasePreparer
class EnergyDatabasePreparer(AbstractDatabasePreparer): class EnergyDatabasePreparer(AbstractDatabasePreparer):
def __init__(self, values): def __init__(self, itemName, timestamp, values):
super(EnergyDatabasePreparer, self).__init__('ElectricEnergy') super(EnergyDatabasePreparer, self).__init__('ElectricEnergy', itemName, timestamp)
self.values = values self.values = values

View File

@ -23,7 +23,7 @@ device_dryer_electric = MeterbusLib.Device(81, OnePhaseElectric_Finder, "Dryer",
devices.append(device_dryer_electric) devices.append(device_dryer_electric)
device_light_electric = MeterbusLib.Device(84, OnePhaseElectric_Finder, "Light", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"]) device_light_electric = MeterbusLib.Device(84, OnePhaseElectric_Finder, "Light", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"])
devices.append(device_light_electric) devices.append(device_light_electric)
device_3phase_electric = MeterbusLib.Device(0x50, ThreePhaseElectric_Finder, "3 Phase Electric", ["Energy T1 total", "Energy T1 partial", "Energy T2 total", "Energy T2 partial", device_3phase_electric = MeterbusLib.Device(0x50, ThreePhaseElectric_Finder, "Total", ["Energy T1 total", "Energy T1 partial", "Energy T2 total", "Energy T2 partial",
"Voltage phase 1", "Current phase 1", "Power phase 1", "img. Power phase 1", "Voltage phase 1", "Current phase 1", "Power phase 1", "img. Power phase 1",
"Voltage phase 2", "Current phase 2", "Power phase 2", "img. Power phase 2", "Voltage phase 2", "Current phase 2", "Power phase 2", "img. Power phase 2",
"Voltage phase 3", "Current phase 3", "Power phase 3", "img. Power phase 3", "Voltage phase 3", "Current phase 3", "Power phase 3", "img. Power phase 3",
@ -46,19 +46,18 @@ class MBusParser(AbstractParser, AbstractNextStage):
def execute(self, data): def execute(self, data):
# Logger.log("MBusParser %s" % (str(data))) # Logger.log("MBusParser %s" % (str(data)))
self.executeNextStage(data)
try: try:
j = json.loads(data['payload']) j = json.loads(data['payload'])
name = j['metadata']['name'] name = j['metadata']['name']
Logger.log("name: %s" % (name)) #Logger.log("name: %s" % (name))
telegram = j['data']['telegram'] telegram = j['data']['telegram']
if telegram[-1] == ' ': if telegram[-1] == ' ':
telegram = telegram[:-1] telegram = telegram[:-1]
Logger.log("telegram: <%s>" % (telegram)) #Logger.log("telegram: <%s>" % (telegram))
mbusTelegram = MeterbusLib.Telegram(devices) mbusTelegram = MeterbusLib.Telegram(devices)
mbusTelegram.fromHexString(telegram) mbusTelegram.fromHexString(telegram)
@ -66,29 +65,37 @@ class MBusParser(AbstractParser, AbstractNextStage):
# Logger.log(json.dumps(mbusTelegram.getJSON(), indent=2)) # Logger.log(json.dumps(mbusTelegram.getJSON(), indent=2))
v = None
if mbusTelegram.frame.category == OnePhaseElectric_Finder: if mbusTelegram.frame.category == OnePhaseElectric_Finder:
# one-phase electricity # one-phase electricity
v = EnergyDatabasePreparer({'name': mbusTelegram.frame.comment, v = EnergyDatabasePreparer(mbusTelegram.frame.comment,
'power': mbusTelegram.frame.dib[4].value, data['timestamp'],
'energy': mbusTelegram.frame.dib[0].value, { 'power': mbusTelegram.frame.dib[4].value,
'timestamp':data['timestamp']}) 'energy': mbusTelegram.frame.dib[0].value})
elif mbusTelegram.frame.category == ThreePhaseElectric_Finder: elif mbusTelegram.frame.category == ThreePhaseElectric_Finder:
# three-phases electricity # three-phases electricity
v = EnergyDatabasePreparer({'name': mbusTelegram.frame.comment, v = EnergyDatabasePreparer(mbusTelegram.frame.comment,
'power': mbusTelegram.frame.dib[17].value, data['timestamp'],
'energy': mbusTelegram.frame.dib[0], {'power': mbusTelegram.frame.dib[17].value,
'timestamp':data['timestamp']}) 'energy': mbusTelegram.frame.dib[0].value})
elif mbusTelegram.frame.category == Thermometer_Hottis: elif mbusTelegram.frame.category == Thermometer_Hottis:
# thermometer # thermometer
v = [] v = []
vv = TemperatureDatabasePreparer({'name': 'Hedge', vv = TemperatureDatabasePreparer('Zero-Reference',
'temperature': mbusTelegram.frame.dib[4].value, data['timestamp'],
'timestamp':data['timestamp']}) {'temperature': mbusTelegram.frame.dib[4].value})
v.append(vv) v.append(vv)
vv = TemperatureDatabasePreparer({'name': 'House', vv = TemperatureDatabasePreparer('Hedge',
'temperature': mbusTelegram.frame.dib[6].value, data['timestamp'],
'timestamp':data['timestamp']}) {'temperature': mbusTelegram.frame.dib[5].value})
v.append(vv)
vv = TemperatureDatabasePreparer('Indoor Basement',
data['timestamp'],
{'temperature': mbusTelegram.frame.dib[6].value})
v.append(vv)
vv = TemperatureDatabasePreparer('House',
data['timestamp'],
{'temperature': mbusTelegram.frame.dib[7].value})
v.append(vv) v.append(vv)
else: else:
raise Exception('illegal name in mbus message found') raise Exception('illegal name in mbus message found')

View File

@ -17,13 +17,11 @@ class ModbusParser(AbstractParser, AbstractNextStage):
if j['metadata']['Slave'] == 'Thermometer': if j['metadata']['Slave'] == 'Thermometer':
vv = [] vv = []
v = TemperatureDatabasePreparer({'name':'Fridge', v = TemperatureDatabasePreparer('Fridge', data['timestamp'],
'temperature':j['data']['t1'], {'temperature':j['data']['t1']})
'timestamp':data['timestamp']})
vv.append(v) vv.append(v)
v = TemperatureDatabasePreparer({'name':'Freezer', v = TemperatureDatabasePreparer('Freezer', data['timestamp'],
'temperature':j['data']['t2'], {'temperature':j['data']['t2']})
'timestamp':data['timestamp']})
vv.append(v) vv.append(v)
self.executeNextStage(vv) self.executeNextStage(vv)
except Exception, e: except Exception, e:

View File

@ -17,7 +17,21 @@ DEBUG_TO_STDOUT = True
BACKGROUND = False BACKGROUND = False
PID_FILE = "/tmp/MqttDispatcher.pid" PID_FILE = "/tmp/MqttDispatcher.pid"
LOG_FILE = "/tmp/MqttDispatcher.log" LOG_FILE = "/tmp/MqttDispatcher.log"
MQTT_BROKER = "172.16.2.15" MQTT_CONFIG = {
'host':'eupenstrasse20.dynamic.hottis.de',
'port':8883,
'tls':True,
'ca':'ca.crt',
'user':'tron',
'password':'geheim123'
}
DATABASE_CONFIG = {
'period':300,
'host':'localhost',
'user':'smarthome',
'password':'smarthome123',
'db':'smarthome'
}
if BACKGROUND: if BACKGROUND:
@ -45,7 +59,7 @@ Logger.log("MqttDispatcher starting")
try: try:
mqttReader = MqttReceiver.create(MQTT_BROKER) mqttReader = MqttReceiver.create(MQTT_CONFIG)
mqttReader.start() mqttReader.start()
Logger.log("MqttReader started ...") Logger.log("MqttReader started ...")
@ -62,7 +76,7 @@ try:
modbusParser.setNextStage(persistentQueue) modbusParser.setNextStage(persistentQueue)
Logger.log("PersistentQueue instantiated ...") Logger.log("PersistentQueue instantiated ...")
databaseEngine = DatabaseEngine(persistentQueue, 60.0) databaseEngine = DatabaseEngine(persistentQueue, DATABASE_CONFIG)
databaseEngine.start() databaseEngine.start()
Logger.log("DatabaseEngine started ...") Logger.log("DatabaseEngine started ...")

View File

@ -3,6 +3,7 @@ from logger import Logger
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import json import json
import datetime import datetime
import ssl
_parsers = {} _parsers = {}
_seenTopics = {} _seenTopics = {}
@ -26,16 +27,20 @@ class MqttReceiver(threading.Thread):
singleton = None singleton = None
@classmethod @classmethod
def create(cls, broker): def create(cls, config):
if cls.singleton is None: if cls.singleton is None:
cls.singleton = MqttReceiver(broker) cls.singleton = MqttReceiver(config)
return cls.singleton return cls.singleton
def __init__(self, broker): def __init__(self, config):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.client = mqtt.Client() self.client = mqtt.Client()
self.client.on_message = on_message self.client.on_message = on_message
self.client.connect(broker, 1883, 60) if config['tls']:
self.client.tls_set(config['ca'], tls_version=ssl.PROTOCOL_TLSv1)
if config['user']:
self.client.username_pw_set(config['user'], config['password'])
self.client.connect(config['host'], config['port'], 60)
self.setDaemon(True) self.setDaemon(True)
def registerParser(self, parser): def registerParser(self, parser):

View File

@ -7,7 +7,7 @@ class PersistentQueue(object):
self.queue = Queue.Queue() self.queue = Queue.Queue()
def execute(self, data): def execute(self, data):
# Logger.log("PersistentQueue %s" % (str(data))) #Logger.log("PersistentQueue %s" % (str(data)))
self.queue.put_nowait(data) self.queue.put_nowait(data)
def empty(self): def empty(self):

View File

@ -1,7 +1,7 @@
from AbstractDatabasePreparer import AbstractDatabasePreparer from AbstractDatabasePreparer import AbstractDatabasePreparer
class TemperatureDatabasePreparer(AbstractDatabasePreparer): class TemperatureDatabasePreparer(AbstractDatabasePreparer):
def __init__(self, values): def __init__(self, itemName, timestamp, values):
super(TemperatureDatabasePreparer, self).__init__('Temperature') super(TemperatureDatabasePreparer, self).__init__('Temperature', itemName, timestamp)
self.values = values self.values = values

38
ca.crt Normal file
View File

@ -0,0 +1,38 @@
-----BEGIN CERTIFICATE-----
MIIGuTCCBKGgAwIBAgIJAOh+htQGJjvkMA0GCSqGSIb3DQEBCwUAMIGYMQswCQYD
VQQGEwJERTEMMAoGA1UECBMDTlJXMQ4wDAYDVQQHEwVFc3NlbjESMBAGA1UEChMJ
aG90dGlzLmRlMQ8wDQYDVQQLEwZzZXJ2ZXIxFTATBgNVBAMTDGhvdHRpcy5kZSBD
QTEQMA4GA1UEKRMHRWFzeVJTQTEdMBsGCSqGSIb3DQEJARYOd29ob0Bob3R0aXMu
ZGUwIBcNMTcwNzEzMDgzMjA3WhgPMjExNzA2MTkwODMyMDdaMIGYMQswCQYDVQQG
EwJERTEMMAoGA1UECBMDTlJXMQ4wDAYDVQQHEwVFc3NlbjESMBAGA1UEChMJaG90
dGlzLmRlMQ8wDQYDVQQLEwZzZXJ2ZXIxFTATBgNVBAMTDGhvdHRpcy5kZSBDQTEQ
MA4GA1UEKRMHRWFzeVJTQTEdMBsGCSqGSIb3DQEJARYOd29ob0Bob3R0aXMuZGUw
ggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDK36TtgOg/aS8y6Ou4OsI8
gWHqvJooCvMc16eFU1+6xayMd/V55+ttACK05aSkur2rW59+gbM2flTFHeCO/hFb
rUGplqQCzn4pKsUyIkhjwuF5oSdNcgqr7BPwZg7vw6TLeT0IrTBPWuODrdqvz6nY
Qo1Gl5+iXB2bdtc1C8rfKVcBl0azo2ISDLxTlfYzPSCDBEXBbEUEUreGpSgYwVZU
yL1H86qKNiJuPyvd9mIo9toRqZ4sfmcTs1kcyJ+ge1N9iOIxkc2fdBwL3Pw3jAaY
p7r3aNIWlz9h6UTCJYvt20yyVUHipo2rI1Fiy6K9RV252vmNpSRdJSMvIAiOlhTz
aXF6Su8ESFk+qaBdGz/dNPPKxzSo1vm3IsUFq2G9ph4I3ZPyCE4JEU68la00worI
S/DvFitEn+EEXlNt05L53wW+GCUQQ31/x5/dceVnnkgr+hY1d28r0skQaN3YY612
17INe8nsG/OwgGAsqs/PVeXz4nMVVNgVpudbdvU1OlalBJtpQ8xNljukKjzGM2Tz
5YkypVEVUvVTEGBUr8wk7E9B1m9bhV4FSiPzC4GV9Dl41INEcDELaSdnqufWFLkr
qy/8Vqlb+FJzONE8iBAE/KCgaL/OzqI4mnp2tI5Ab82+UrlqI3BhoThLGdyIghtb
tzX1RahEkIE/CdWEWSTRkwIDAQABo4IBADCB/TAdBgNVHQ4EFgQUqvzXx0ltDqeS
eSwT7lw/0/PsPbkwgc0GA1UdIwSBxTCBwoAUqvzXx0ltDqeSeSwT7lw/0/PsPbmh
gZ6kgZswgZgxCzAJBgNVBAYTAkRFMQwwCgYDVQQIEwNOUlcxDjAMBgNVBAcTBUVz
c2VuMRIwEAYDVQQKEwlob3R0aXMuZGUxDzANBgNVBAsTBnNlcnZlcjEVMBMGA1UE
AxMMaG90dGlzLmRlIENBMRAwDgYDVQQpEwdFYXN5UlNBMR0wGwYJKoZIhvcNAQkB
Fg53b2hvQGhvdHRpcy5kZYIJAOh+htQGJjvkMAwGA1UdEwQFMAMBAf8wDQYJKoZI
hvcNAQELBQADggIBAHnFNC8UeJjizpEdszTxAp6QpHO4nq+Wwb37rp6JA7Hs/jAB
ddeWTPHGNVOFDV45CF8f39INhmvr7xIfMHBwwuQz/UetXH2ojVI06CWHFp/N7YkF
ZWUDY+GkbxnQ+F7xqahqHSEzv2hcjAj6xaBD/hMUITW6YP0JDL0C45pMxZKsYCLA
8+5YxwM2Mzs24YtTlaOdlqg+iKXKSn4FTrbDhsb/MQhf/EvVLncdL0Fiq1voUzx/
6HoIsYWyVwT62WdUF42Aoaxdvp4DLinmk9ANnVF/nfpzRz69saQboq4k2mDUCEoj
jwXBeobkAxrYasepIkKbK5PTLuC7M0t41++uV5j0YobrLCjvn7unVJgysDcRihuN
PuT3Yohi4dRjo8ZKYraWjSgCTTWNuNHjxMSBgfUYDnsh7wF0VVhiiHlybhyuvtJj
Szn/YZRhsD74//m1DdAtXtKCptzssxTrDhSXiGrrP4+a5mAqPowrzA6C8fTlwy50
qLEXvm2CoZHEmEj6n61LKILQa9o9KLT6TIX2925Yzk9BLdbFprrzB7gpNzsPJ4AU
sDQln+FuV4resP5lYb0Cc9x3OjAACkAv2bHNOI2AXtD3xXRmVPNmthGeqABY8e9a
Alw3TtZnQpaz9SE7Ta699rf5Qb7hq4ERD2is8ovlXJ1lWLR4MQk3lfxZeEfb
-----END CERTIFICATE-----

17
schema.sql Normal file
View File

@ -0,0 +1,17 @@
create table ElectricEnergy (
id int(10) unsigned not null auto_increment,
name varchar(64) not null,
ts timestamp not null,
energy int(10) unsigned not null,
power int(10) unsigned not null,
primary key (id)
) engine=InnoDB;
create table Temperature (
id int(10) unsigned not null auto_increment,
name varchar(64) not null,
ts timestamp not null,
temperature double not null,
primary key (id)
) engine=InnoDB;