diff --git a/AbstractDatabasePreparer.py b/AbstractDatabasePreparer.py index c737f69..f115b48 100644 --- a/AbstractDatabasePreparer.py +++ b/AbstractDatabasePreparer.py @@ -1,9 +1,23 @@ from logger import Logger class AbstractDatabasePreparer(object): - def __init__(self, tablename): + def __init__(self, tablename, itemName, timestamp): self.tablename = tablename + self.itemName = itemName + self.timestamp = timestamp self.values = {} - def getInsertStatement(self): - return "table: %s " % (self.tablename) + str(self.values) \ No newline at end of file + def getItemName(self): + return self.itemName + + def getTableName(self): + return self.tablename + + def getTimestamp(self): + return self.timestamp + + def getValues(self): + return self.values + + def getData(self): + return "table: %s, %s, %s: " % (self.tablename, self.itemName, self.timestamp) + str(self.values) \ No newline at end of file diff --git a/DatabaseEngine.py b/DatabaseEngine.py index ceab13a..a9315f5 100644 --- a/DatabaseEngine.py +++ b/DatabaseEngine.py @@ -1,19 +1,21 @@ import threading from logger import Logger +import MySQLdb + class DatabaseEngine(threading.Thread): singleton = None @classmethod - def create(cls, queue, period): + def create(cls, queue, config): if cls.singleton is None: - cls.singleton = DatabaseEngine(queue, period) + cls.singleton = DatabaseEngine(queue, config) return cls.singleton - def __init__(self, queue, period): + def __init__(self, queue, config): threading.Thread.__init__(self) self.event = threading.Event() - self.period = period + self.config = config self.queue = queue self.startTimer() self.setDaemon(True) @@ -22,7 +24,7 @@ class DatabaseEngine(threading.Thread): self.event.set() def startTimer(self): - self.timer = threading.Timer(self.period, self.event.set) + self.timer = threading.Timer(self.config['period'], self.event.set) self.timer.start() def run(self): @@ -34,8 +36,31 @@ class DatabaseEngine(threading.Thread): data = self.queue.get() if (type(data) != list): data = [data] + db = MySQLdb.connect(host=self.config['host'], user=self.config['user'], + passwd=self.config['password'], db=self.config['db']) + cursor = db.cursor() for dbo in data: - Logger.log("DatabaseEngine receives: %s" % (str(dbo.getInsertStatement()))) + Logger.log("DatabaseEngine receives: %s" % (str(dbo.getData()))) + columns = dbo.getValues() + columns['name'] = dbo.getItemName() + columns['ts'] = dbo.getTimestamp().isoformat() + + columnNames = '(' + ','.join(columns.keys()) + ')' + valuePlaceholders = '(' + ','.join(['%s' for v in columns.values()]) + ')' + + #Logger.log("columns: %s" % columnNames) + #Logger.log("placeholders: %s" % valuePlaceholders) + #Logger.log("values: %s" % columns.values()) + + stmt = 'INSERT INTO {0} {1} VALUES {2}'.format(dbo.getTableName(), columnNames, valuePlaceholders) + #Logger.log("stmt: %s" % (stmt)) + + cursor.execute(stmt, columns.values()) + Logger.log("stmt executed") + + db.commit() + db.close() + else: Logger.log("DatabaseEngine: no more data to handle") diff --git a/EnergyDatabasePreparer.py b/EnergyDatabasePreparer.py index c1a3814..f862f48 100644 --- a/EnergyDatabasePreparer.py +++ b/EnergyDatabasePreparer.py @@ -1,7 +1,7 @@ from AbstractDatabasePreparer import AbstractDatabasePreparer class EnergyDatabasePreparer(AbstractDatabasePreparer): - def __init__(self, values): - super(EnergyDatabasePreparer, self).__init__('ElectricEnergy') + def __init__(self, itemName, timestamp, values): + super(EnergyDatabasePreparer, self).__init__('ElectricEnergy', itemName, timestamp) self.values = values \ No newline at end of file diff --git a/MBusParser.py b/MBusParser.py index 3638ecb..5c78843 100644 --- a/MBusParser.py +++ b/MBusParser.py @@ -23,7 +23,7 @@ device_dryer_electric = MeterbusLib.Device(81, OnePhaseElectric_Finder, "Dryer", devices.append(device_dryer_electric) device_light_electric = MeterbusLib.Device(84, OnePhaseElectric_Finder, "Light", ["Energy total", "Energy partial", "Voltage", "Current", "Power", "img. Power"]) devices.append(device_light_electric) -device_3phase_electric = MeterbusLib.Device(0x50, ThreePhaseElectric_Finder, "3 Phase Electric", ["Energy T1 total", "Energy T1 partial", "Energy T2 total", "Energy T2 partial", +device_3phase_electric = MeterbusLib.Device(0x50, ThreePhaseElectric_Finder, "Total", ["Energy T1 total", "Energy T1 partial", "Energy T2 total", "Energy T2 partial", "Voltage phase 1", "Current phase 1", "Power phase 1", "img. Power phase 1", "Voltage phase 2", "Current phase 2", "Power phase 2", "img. Power phase 2", "Voltage phase 3", "Current phase 3", "Power phase 3", "img. Power phase 3", @@ -46,19 +46,18 @@ class MBusParser(AbstractParser, AbstractNextStage): def execute(self, data): # Logger.log("MBusParser %s" % (str(data))) - self.executeNextStage(data) try: j = json.loads(data['payload']) name = j['metadata']['name'] - Logger.log("name: %s" % (name)) + #Logger.log("name: %s" % (name)) telegram = j['data']['telegram'] if telegram[-1] == ' ': telegram = telegram[:-1] - Logger.log("telegram: <%s>" % (telegram)) + #Logger.log("telegram: <%s>" % (telegram)) mbusTelegram = MeterbusLib.Telegram(devices) mbusTelegram.fromHexString(telegram) @@ -66,29 +65,37 @@ class MBusParser(AbstractParser, AbstractNextStage): # Logger.log(json.dumps(mbusTelegram.getJSON(), indent=2)) - + v = None if mbusTelegram.frame.category == OnePhaseElectric_Finder: # one-phase electricity - v = EnergyDatabasePreparer({'name': mbusTelegram.frame.comment, - 'power': mbusTelegram.frame.dib[4].value, - 'energy': mbusTelegram.frame.dib[0].value, - 'timestamp':data['timestamp']}) + v = EnergyDatabasePreparer(mbusTelegram.frame.comment, + data['timestamp'], + { 'power': mbusTelegram.frame.dib[4].value, + 'energy': mbusTelegram.frame.dib[0].value}) elif mbusTelegram.frame.category == ThreePhaseElectric_Finder: # three-phases electricity - v = EnergyDatabasePreparer({'name': mbusTelegram.frame.comment, - 'power': mbusTelegram.frame.dib[17].value, - 'energy': mbusTelegram.frame.dib[0], - 'timestamp':data['timestamp']}) + v = EnergyDatabasePreparer(mbusTelegram.frame.comment, + data['timestamp'], + {'power': mbusTelegram.frame.dib[17].value, + 'energy': mbusTelegram.frame.dib[0].value}) elif mbusTelegram.frame.category == Thermometer_Hottis: # thermometer v = [] - vv = TemperatureDatabasePreparer({'name': 'Hedge', - 'temperature': mbusTelegram.frame.dib[4].value, - 'timestamp':data['timestamp']}) + vv = TemperatureDatabasePreparer('Zero-Reference', + data['timestamp'], + {'temperature': mbusTelegram.frame.dib[4].value}) v.append(vv) - vv = TemperatureDatabasePreparer({'name': 'House', - 'temperature': mbusTelegram.frame.dib[6].value, - 'timestamp':data['timestamp']}) + vv = TemperatureDatabasePreparer('Hedge', + data['timestamp'], + {'temperature': mbusTelegram.frame.dib[5].value}) + v.append(vv) + vv = TemperatureDatabasePreparer('Indoor Basement', + data['timestamp'], + {'temperature': mbusTelegram.frame.dib[6].value}) + v.append(vv) + vv = TemperatureDatabasePreparer('House', + data['timestamp'], + {'temperature': mbusTelegram.frame.dib[7].value}) v.append(vv) else: raise Exception('illegal name in mbus message found') diff --git a/ModbusParser.py b/ModbusParser.py index 0606018..34189f5 100644 --- a/ModbusParser.py +++ b/ModbusParser.py @@ -17,13 +17,11 @@ class ModbusParser(AbstractParser, AbstractNextStage): if j['metadata']['Slave'] == 'Thermometer': vv = [] - v = TemperatureDatabasePreparer({'name':'Fridge', - 'temperature':j['data']['t1'], - 'timestamp':data['timestamp']}) + v = TemperatureDatabasePreparer('Fridge', data['timestamp'], + {'temperature':j['data']['t1']}) vv.append(v) - v = TemperatureDatabasePreparer({'name':'Freezer', - 'temperature':j['data']['t2'], - 'timestamp':data['timestamp']}) + v = TemperatureDatabasePreparer('Freezer', data['timestamp'], + {'temperature':j['data']['t2']}) vv.append(v) self.executeNextStage(vv) except Exception, e: diff --git a/MqttDispatcher.py b/MqttDispatcher.py index f196c61..6569f3f 100644 --- a/MqttDispatcher.py +++ b/MqttDispatcher.py @@ -17,7 +17,21 @@ DEBUG_TO_STDOUT = True BACKGROUND = False PID_FILE = "/tmp/MqttDispatcher.pid" LOG_FILE = "/tmp/MqttDispatcher.log" -MQTT_BROKER = "172.16.2.15" +MQTT_CONFIG = { + 'host':'eupenstrasse20.dynamic.hottis.de', + 'port':8883, + 'tls':True, + 'ca':'ca.crt', + 'user':'tron', + 'password':'geheim123' +} +DATABASE_CONFIG = { + 'period':300, + 'host':'localhost', + 'user':'smarthome', + 'password':'smarthome123', + 'db':'smarthome' +} if BACKGROUND: @@ -45,7 +59,7 @@ Logger.log("MqttDispatcher starting") try: - mqttReader = MqttReceiver.create(MQTT_BROKER) + mqttReader = MqttReceiver.create(MQTT_CONFIG) mqttReader.start() Logger.log("MqttReader started ...") @@ -62,7 +76,7 @@ try: modbusParser.setNextStage(persistentQueue) Logger.log("PersistentQueue instantiated ...") - databaseEngine = DatabaseEngine(persistentQueue, 60.0) + databaseEngine = DatabaseEngine(persistentQueue, DATABASE_CONFIG) databaseEngine.start() Logger.log("DatabaseEngine started ...") diff --git a/MqttReceiver.py b/MqttReceiver.py index 576da47..dc7ce98 100644 --- a/MqttReceiver.py +++ b/MqttReceiver.py @@ -3,6 +3,7 @@ from logger import Logger import paho.mqtt.client as mqtt import json import datetime +import ssl _parsers = {} _seenTopics = {} @@ -26,16 +27,20 @@ class MqttReceiver(threading.Thread): singleton = None @classmethod - def create(cls, broker): + def create(cls, config): if cls.singleton is None: - cls.singleton = MqttReceiver(broker) + cls.singleton = MqttReceiver(config) return cls.singleton - def __init__(self, broker): + def __init__(self, config): threading.Thread.__init__(self) self.client = mqtt.Client() self.client.on_message = on_message - self.client.connect(broker, 1883, 60) + if config['tls']: + self.client.tls_set(config['ca'], tls_version=ssl.PROTOCOL_TLSv1) + if config['user']: + self.client.username_pw_set(config['user'], config['password']) + self.client.connect(config['host'], config['port'], 60) self.setDaemon(True) def registerParser(self, parser): diff --git a/PersistentQueue.py b/PersistentQueue.py index 28ce733..b7f1f1d 100644 --- a/PersistentQueue.py +++ b/PersistentQueue.py @@ -7,7 +7,7 @@ class PersistentQueue(object): self.queue = Queue.Queue() def execute(self, data): - # Logger.log("PersistentQueue %s" % (str(data))) + #Logger.log("PersistentQueue %s" % (str(data))) self.queue.put_nowait(data) def empty(self): diff --git a/TemperatureDatabasePreparer.py b/TemperatureDatabasePreparer.py index 06420f7..c84b43d 100644 --- a/TemperatureDatabasePreparer.py +++ b/TemperatureDatabasePreparer.py @@ -1,7 +1,7 @@ from AbstractDatabasePreparer import AbstractDatabasePreparer class TemperatureDatabasePreparer(AbstractDatabasePreparer): - def __init__(self, values): - super(TemperatureDatabasePreparer, self).__init__('Temperature') + def __init__(self, itemName, timestamp, values): + super(TemperatureDatabasePreparer, self).__init__('Temperature', itemName, timestamp) self.values = values \ No newline at end of file diff --git a/ca.crt b/ca.crt new file mode 100644 index 0000000..9815290 --- /dev/null +++ b/ca.crt @@ -0,0 +1,38 @@ +-----BEGIN CERTIFICATE----- +MIIGuTCCBKGgAwIBAgIJAOh+htQGJjvkMA0GCSqGSIb3DQEBCwUAMIGYMQswCQYD +VQQGEwJERTEMMAoGA1UECBMDTlJXMQ4wDAYDVQQHEwVFc3NlbjESMBAGA1UEChMJ +aG90dGlzLmRlMQ8wDQYDVQQLEwZzZXJ2ZXIxFTATBgNVBAMTDGhvdHRpcy5kZSBD +QTEQMA4GA1UEKRMHRWFzeVJTQTEdMBsGCSqGSIb3DQEJARYOd29ob0Bob3R0aXMu +ZGUwIBcNMTcwNzEzMDgzMjA3WhgPMjExNzA2MTkwODMyMDdaMIGYMQswCQYDVQQG +EwJERTEMMAoGA1UECBMDTlJXMQ4wDAYDVQQHEwVFc3NlbjESMBAGA1UEChMJaG90 +dGlzLmRlMQ8wDQYDVQQLEwZzZXJ2ZXIxFTATBgNVBAMTDGhvdHRpcy5kZSBDQTEQ +MA4GA1UEKRMHRWFzeVJTQTEdMBsGCSqGSIb3DQEJARYOd29ob0Bob3R0aXMuZGUw +ggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDK36TtgOg/aS8y6Ou4OsI8 +gWHqvJooCvMc16eFU1+6xayMd/V55+ttACK05aSkur2rW59+gbM2flTFHeCO/hFb +rUGplqQCzn4pKsUyIkhjwuF5oSdNcgqr7BPwZg7vw6TLeT0IrTBPWuODrdqvz6nY +Qo1Gl5+iXB2bdtc1C8rfKVcBl0azo2ISDLxTlfYzPSCDBEXBbEUEUreGpSgYwVZU +yL1H86qKNiJuPyvd9mIo9toRqZ4sfmcTs1kcyJ+ge1N9iOIxkc2fdBwL3Pw3jAaY +p7r3aNIWlz9h6UTCJYvt20yyVUHipo2rI1Fiy6K9RV252vmNpSRdJSMvIAiOlhTz +aXF6Su8ESFk+qaBdGz/dNPPKxzSo1vm3IsUFq2G9ph4I3ZPyCE4JEU68la00worI +S/DvFitEn+EEXlNt05L53wW+GCUQQ31/x5/dceVnnkgr+hY1d28r0skQaN3YY612 +17INe8nsG/OwgGAsqs/PVeXz4nMVVNgVpudbdvU1OlalBJtpQ8xNljukKjzGM2Tz +5YkypVEVUvVTEGBUr8wk7E9B1m9bhV4FSiPzC4GV9Dl41INEcDELaSdnqufWFLkr +qy/8Vqlb+FJzONE8iBAE/KCgaL/OzqI4mnp2tI5Ab82+UrlqI3BhoThLGdyIghtb +tzX1RahEkIE/CdWEWSTRkwIDAQABo4IBADCB/TAdBgNVHQ4EFgQUqvzXx0ltDqeS +eSwT7lw/0/PsPbkwgc0GA1UdIwSBxTCBwoAUqvzXx0ltDqeSeSwT7lw/0/PsPbmh +gZ6kgZswgZgxCzAJBgNVBAYTAkRFMQwwCgYDVQQIEwNOUlcxDjAMBgNVBAcTBUVz +c2VuMRIwEAYDVQQKEwlob3R0aXMuZGUxDzANBgNVBAsTBnNlcnZlcjEVMBMGA1UE +AxMMaG90dGlzLmRlIENBMRAwDgYDVQQpEwdFYXN5UlNBMR0wGwYJKoZIhvcNAQkB +Fg53b2hvQGhvdHRpcy5kZYIJAOh+htQGJjvkMAwGA1UdEwQFMAMBAf8wDQYJKoZI +hvcNAQELBQADggIBAHnFNC8UeJjizpEdszTxAp6QpHO4nq+Wwb37rp6JA7Hs/jAB +ddeWTPHGNVOFDV45CF8f39INhmvr7xIfMHBwwuQz/UetXH2ojVI06CWHFp/N7YkF +ZWUDY+GkbxnQ+F7xqahqHSEzv2hcjAj6xaBD/hMUITW6YP0JDL0C45pMxZKsYCLA +8+5YxwM2Mzs24YtTlaOdlqg+iKXKSn4FTrbDhsb/MQhf/EvVLncdL0Fiq1voUzx/ +6HoIsYWyVwT62WdUF42Aoaxdvp4DLinmk9ANnVF/nfpzRz69saQboq4k2mDUCEoj +jwXBeobkAxrYasepIkKbK5PTLuC7M0t41++uV5j0YobrLCjvn7unVJgysDcRihuN +PuT3Yohi4dRjo8ZKYraWjSgCTTWNuNHjxMSBgfUYDnsh7wF0VVhiiHlybhyuvtJj +Szn/YZRhsD74//m1DdAtXtKCptzssxTrDhSXiGrrP4+a5mAqPowrzA6C8fTlwy50 +qLEXvm2CoZHEmEj6n61LKILQa9o9KLT6TIX2925Yzk9BLdbFprrzB7gpNzsPJ4AU +sDQln+FuV4resP5lYb0Cc9x3OjAACkAv2bHNOI2AXtD3xXRmVPNmthGeqABY8e9a +Alw3TtZnQpaz9SE7Ta699rf5Qb7hq4ERD2is8ovlXJ1lWLR4MQk3lfxZeEfb +-----END CERTIFICATE----- diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..831b389 --- /dev/null +++ b/schema.sql @@ -0,0 +1,17 @@ +create table ElectricEnergy ( + id int(10) unsigned not null auto_increment, + name varchar(64) not null, + ts timestamp not null, + energy int(10) unsigned not null, + power int(10) unsigned not null, + primary key (id) +) engine=InnoDB; + +create table Temperature ( + id int(10) unsigned not null auto_increment, + name varchar(64) not null, + ts timestamp not null, + temperature double not null, + primary key (id) +) engine=InnoDB; +