commit a9b93ae3e8b767dcbec08ec0d27e4923af29e7ad Author: Wolfgang Hottgenroth Date: Mon Feb 13 23:16:58 2023 +0100 initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5fbeadb --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +ENV +*.pyc +src/.venv/* + diff --git a/schema/extend-schema.sql b/schema/extend-schema.sql new file mode 100644 index 0000000..2d0bf7f --- /dev/null +++ b/schema/extend-schema.sql @@ -0,0 +1,30 @@ +-- extend the schema monitoring from badesee application + + +create table level_sensor_t ( + id serial not null primary key, + sensor_type varchar(16) not null, + ground_level numeric(10, 0) not null, + device integer not null references device_id(id), + unique(device) +); + +create table level_measurement_t ( + time timestamp without time zone not null, + device_name varchar(16) not null, + raw_level numeric(10, 0), + level numeric(10, 0), + status varchar(16), + battery float +); + +select create_hypertable('level_measurement_t', 'time'); + +insert into device_t (device_id, label) values('eui-a84041a2c18341d6', 'deilbach'); +insert into level_sensor_t (sensor_type, ground_level, device) + values('LDDS75', 300, (select id from device_t where device_id='eui-a84041a2c18341d6')); + +create user level_preprocessor password 'geheim'; +grant select on device_t, level_sensor_t to level_preprocessor; +grant insert on level_measurement_t to level_preprocessor; + diff --git a/src/preprocess.py b/src/preprocess.py new file mode 100644 index 0000000..1313bfb --- /dev/null +++ b/src/preprocess.py @@ -0,0 +1,214 @@ +from loguru import logger +import paho.mqtt.client as mqtt +import os +import sys +import ssl +import json +import base64 +import struct +import psycopg2 +import psycopg2.extras + + +oldDevice = {} + +class DeviceNotFoundException (Exception): + def __init__(self, deviceId): + self.deviceId = deviceId + +class UnknownSensorException (Exception): + def __init__(self, deviceId): + self.deviceId = deviceId + +class DbOp(object): + def __init__(self, config): + self.conn = None + + def __getConn(self): + conn = psycopg2.connect() + conn.autocommit = False + return conn + + def getDevice(self, deviceId): + try: + conn = self.__getConn() + with conn: + with conn.cursor() as cur: + cur.execute("select id, label from device_t where device_id = %(deviceId)s", { 'deviceId': deviceId }) + res = cur.fetchone() + if res is None: + raise DeviceNotFoundException(deviceId) + device_label = res[1] + id = res[0] + logger.debug(f"{device_label=}") + with conn.cursor() as cur: + cur.execute("select sensor_type, ground_level from level_sensor_t where device = %(id)s", { 'id': id }) + res = cur.fetchone() + if res is None: + raise UnknownSensorException(deviceId) + sensor_type = res[0] + ground_level = res[1] + logger.debug(f"{sensor_type=}, {ground_level=}") + device = { + 'label': device_label, + 'sensor_type': sensor_type, + 'ground_level': ground_level + } + return device + except Exception as e: + logger.error(f"Error getting device: {e}") + raise DeviceNotFoundException(deviceId) + finally: + if conn: + conn.close() + + def storeMeasurement(self, measurement): + try: + logger.info(f"About to store {measurement}") + conn = self.__getConn() + with conn: + with conn.cursor() as cur: + for entry in measurement['measurements']: + cur.execute("insert into measurement_t (time, device_name, sensor_name, temperature) values (now(), %(dname)s, %(sname)s, %(tempc)s)", + { 'dname': measurement['label'], 'sname': entry['label'], 'tempc': entry['value'] }) + except Exception as e: + logger.error(f"Error storing measurement: {e}") + finally: + if conn: + conn.close() + +def mqttOnConnectCallback(client, userdata, flags, rc): + logger.info("mqtt connected") + mqtt_in_topic = f"v3/{config['APPLICATION_TENANT']}/devices/+/up" + client.subscribe(mqtt_in_topic) + logger.info(f"subscribed to {mqtt_in_topic}") + +def mqttOnMessageCallback(client, userdata, message): + try: + topic = message.topic + payload = message.payload + logger.debug(f"mqtt message received: {topic} -> {payload}") + parse_payload = json.loads(payload) + + sendSetupMessage = False + device_id = parse_payload['end_device_ids']['device_id'] + + dbh = DbOp(config) + device = dbh.getDevice(device_id) + + global oldDevice + if (device != oldDevice): + logger.info("device configuration in database has changed") + oldDevice = device + sendSetupMessage = True + num_of_sensors = len(device['sensors']) + sensors = { x['address']:[x['index'], x['label']] for x in device['sensors']} + + frame = base64.b64decode(parse_payload['uplink_message']['frm_payload']) + status = struct.unpack('