initial
This commit is contained in:
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
ENV
|
||||||
|
*.pyc
|
||||||
|
src/.venv/*
|
||||||
|
|
30
schema/extend-schema.sql
Normal file
30
schema/extend-schema.sql
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
-- extend the schema monitoring from badesee application
|
||||||
|
|
||||||
|
|
||||||
|
create table level_sensor_t (
|
||||||
|
id serial not null primary key,
|
||||||
|
sensor_type varchar(16) not null,
|
||||||
|
ground_level numeric(10, 0) not null,
|
||||||
|
device integer not null references device_id(id),
|
||||||
|
unique(device)
|
||||||
|
);
|
||||||
|
|
||||||
|
create table level_measurement_t (
|
||||||
|
time timestamp without time zone not null,
|
||||||
|
device_name varchar(16) not null,
|
||||||
|
raw_level numeric(10, 0),
|
||||||
|
level numeric(10, 0),
|
||||||
|
status varchar(16),
|
||||||
|
battery float
|
||||||
|
);
|
||||||
|
|
||||||
|
select create_hypertable('level_measurement_t', 'time');
|
||||||
|
|
||||||
|
insert into device_t (device_id, label) values('eui-a84041a2c18341d6', 'deilbach');
|
||||||
|
insert into level_sensor_t (sensor_type, ground_level, device)
|
||||||
|
values('LDDS75', 300, (select id from device_t where device_id='eui-a84041a2c18341d6'));
|
||||||
|
|
||||||
|
create user level_preprocessor password 'geheim';
|
||||||
|
grant select on device_t, level_sensor_t to level_preprocessor;
|
||||||
|
grant insert on level_measurement_t to level_preprocessor;
|
||||||
|
|
214
src/preprocess.py
Normal file
214
src/preprocess.py
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
from loguru import logger
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import ssl
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
import struct
|
||||||
|
import psycopg2
|
||||||
|
import psycopg2.extras
|
||||||
|
|
||||||
|
|
||||||
|
oldDevice = {}
|
||||||
|
|
||||||
|
class DeviceNotFoundException (Exception):
|
||||||
|
def __init__(self, deviceId):
|
||||||
|
self.deviceId = deviceId
|
||||||
|
|
||||||
|
class UnknownSensorException (Exception):
|
||||||
|
def __init__(self, deviceId):
|
||||||
|
self.deviceId = deviceId
|
||||||
|
|
||||||
|
class DbOp(object):
|
||||||
|
def __init__(self, config):
|
||||||
|
self.conn = None
|
||||||
|
|
||||||
|
def __getConn(self):
|
||||||
|
conn = psycopg2.connect()
|
||||||
|
conn.autocommit = False
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def getDevice(self, deviceId):
|
||||||
|
try:
|
||||||
|
conn = self.__getConn()
|
||||||
|
with conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("select id, label from device_t where device_id = %(deviceId)s", { 'deviceId': deviceId })
|
||||||
|
res = cur.fetchone()
|
||||||
|
if res is None:
|
||||||
|
raise DeviceNotFoundException(deviceId)
|
||||||
|
device_label = res[1]
|
||||||
|
id = res[0]
|
||||||
|
logger.debug(f"{device_label=}")
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("select sensor_type, ground_level from level_sensor_t where device = %(id)s", { 'id': id })
|
||||||
|
res = cur.fetchone()
|
||||||
|
if res is None:
|
||||||
|
raise UnknownSensorException(deviceId)
|
||||||
|
sensor_type = res[0]
|
||||||
|
ground_level = res[1]
|
||||||
|
logger.debug(f"{sensor_type=}, {ground_level=}")
|
||||||
|
device = {
|
||||||
|
'label': device_label,
|
||||||
|
'sensor_type': sensor_type,
|
||||||
|
'ground_level': ground_level
|
||||||
|
}
|
||||||
|
return device
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting device: {e}")
|
||||||
|
raise DeviceNotFoundException(deviceId)
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def storeMeasurement(self, measurement):
|
||||||
|
try:
|
||||||
|
logger.info(f"About to store {measurement}")
|
||||||
|
conn = self.__getConn()
|
||||||
|
with conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
for entry in measurement['measurements']:
|
||||||
|
cur.execute("insert into measurement_t (time, device_name, sensor_name, temperature) values (now(), %(dname)s, %(sname)s, %(tempc)s)",
|
||||||
|
{ 'dname': measurement['label'], 'sname': entry['label'], 'tempc': entry['value'] })
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error storing measurement: {e}")
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def mqttOnConnectCallback(client, userdata, flags, rc):
|
||||||
|
logger.info("mqtt connected")
|
||||||
|
mqtt_in_topic = f"v3/{config['APPLICATION_TENANT']}/devices/+/up"
|
||||||
|
client.subscribe(mqtt_in_topic)
|
||||||
|
logger.info(f"subscribed to {mqtt_in_topic}")
|
||||||
|
|
||||||
|
def mqttOnMessageCallback(client, userdata, message):
|
||||||
|
try:
|
||||||
|
topic = message.topic
|
||||||
|
payload = message.payload
|
||||||
|
logger.debug(f"mqtt message received: {topic} -> {payload}")
|
||||||
|
parse_payload = json.loads(payload)
|
||||||
|
|
||||||
|
sendSetupMessage = False
|
||||||
|
device_id = parse_payload['end_device_ids']['device_id']
|
||||||
|
|
||||||
|
dbh = DbOp(config)
|
||||||
|
device = dbh.getDevice(device_id)
|
||||||
|
|
||||||
|
global oldDevice
|
||||||
|
if (device != oldDevice):
|
||||||
|
logger.info("device configuration in database has changed")
|
||||||
|
oldDevice = device
|
||||||
|
sendSetupMessage = True
|
||||||
|
num_of_sensors = len(device['sensors'])
|
||||||
|
sensors = { x['address']:[x['index'], x['label']] for x in device['sensors']}
|
||||||
|
|
||||||
|
frame = base64.b64decode(parse_payload['uplink_message']['frm_payload'])
|
||||||
|
status = struct.unpack('<H', frame[0:2])[0]
|
||||||
|
logger.debug(f"{frame=}, {status=}")
|
||||||
|
|
||||||
|
if (status == 4):
|
||||||
|
logger.info(f"Start up message received from {device_id}, {device['label']}")
|
||||||
|
for i in range(0, num_of_sensors):
|
||||||
|
start_index = 2 + (i * 8)
|
||||||
|
end_index = start_index + 8
|
||||||
|
received_sensor_address = struct.unpack('<Q', frame[start_index:end_index])[0]
|
||||||
|
logger.debug(f"sensor {i}: 0x{received_sensor_address:016x}")
|
||||||
|
if (received_sensor_address not in sensors):
|
||||||
|
raise UnknownSensorException(received_sensor_address)
|
||||||
|
sendSetupMessage = True
|
||||||
|
else:
|
||||||
|
logger.info(f"Regular message received from {device_id}, {device['label']}")
|
||||||
|
measurement = {
|
||||||
|
'label': device['label'],
|
||||||
|
'time': parse_payload['received_at'],
|
||||||
|
'measurements': []
|
||||||
|
}
|
||||||
|
for i in range(0, num_of_sensors):
|
||||||
|
addr_start_index = 2 + (i * (8 + 4))
|
||||||
|
addr_end_index = addr_start_index + 8
|
||||||
|
received_sensor_address = struct.unpack('<Q', frame[addr_start_index:addr_end_index])[0]
|
||||||
|
if (received_sensor_address not in sensors):
|
||||||
|
raise UnknownSensorException(received_sensor_address)
|
||||||
|
value_start_index = addr_end_index
|
||||||
|
value_end_index = value_start_index + 4
|
||||||
|
value = struct.unpack('<i', frame[value_start_index:value_end_index])[0] / 128
|
||||||
|
logger.debug(f"sensor {i}: 0x{received_sensor_address:016x} = {value:.2f} °C")
|
||||||
|
measurement['measurements'].append({
|
||||||
|
"address": received_sensor_address,
|
||||||
|
"value": value,
|
||||||
|
"label": sensors[received_sensor_address][1]
|
||||||
|
})
|
||||||
|
dbh.storeMeasurement(measurement)
|
||||||
|
|
||||||
|
if (sendSetupMessage):
|
||||||
|
sendSetupMessage = False
|
||||||
|
setupMessage = bytes()
|
||||||
|
null = 0
|
||||||
|
null = null.to_bytes(1, byteorder='big')
|
||||||
|
for sk, sv in sensors.items():
|
||||||
|
logger.debug(f"{sk=}, {sv=}")
|
||||||
|
setupMessage += struct.pack('<Q', sk) + sv[0].to_bytes(1, byteorder='big') + bytes(sv[1], 'ASCII') + null
|
||||||
|
setupMessage = base64.b64encode(setupMessage).decode('ASCII')
|
||||||
|
logger.debug(f"about to send setup message {setupMessage}")
|
||||||
|
setupTopic = f"v3/{config['APPLICATION_TENANT']}/devices/{device_id}/down/push"
|
||||||
|
client.publish(setupTopic, json.dumps({
|
||||||
|
"downlinks": [
|
||||||
|
{
|
||||||
|
"f_port": 1,
|
||||||
|
"frm_payload": setupMessage,
|
||||||
|
"priority": "NORMAL"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}))
|
||||||
|
|
||||||
|
except UnknownSensorException as e:
|
||||||
|
logger.error(f"unknown sensor in message {e.sensorAddress}")
|
||||||
|
except DeviceNotFoundException as e:
|
||||||
|
logger.error(f"message from unknown device {e.deviceId}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"unable to parse message {payload}, {e}")
|
||||||
|
|
||||||
|
def mqttOnDisconnectCallback(client, userdata, rc):
|
||||||
|
pass
|
||||||
|
|
||||||
|
logger.info("preprocess starting")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
REQUIRED_CONFIG_OPTIONS = [
|
||||||
|
'MQTT_LOGIN',
|
||||||
|
'MQTT_PASSWORD',
|
||||||
|
'MQTT_BROKER',
|
||||||
|
'MQTT_PORT',
|
||||||
|
'MQTT_CA',
|
||||||
|
'APPLICATION_TENANT'
|
||||||
|
]
|
||||||
|
|
||||||
|
config = {}
|
||||||
|
for rco in REQUIRED_CONFIG_OPTIONS:
|
||||||
|
try:
|
||||||
|
config[rco] = os.environ[rco]
|
||||||
|
except KeyError:
|
||||||
|
logger.error(f"{rco} is a required config option, not available in environment")
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
client = mqtt.Client()
|
||||||
|
client.on_message = mqttOnMessageCallback
|
||||||
|
client.on_connect = mqttOnConnectCallback
|
||||||
|
client.on_disconnect = mqttOnDisconnectCallback
|
||||||
|
client.username_pw_set(config['MQTT_LOGIN'], config['MQTT_PASSWORD'])
|
||||||
|
client.tls_set(
|
||||||
|
cert_reqs=ssl.CERT_REQUIRED,
|
||||||
|
ciphers=None
|
||||||
|
)
|
||||||
|
client.connect(config["MQTT_BROKER"], int(config["MQTT_PORT"]))
|
||||||
|
logger.info("mqtt loop starting")
|
||||||
|
client.loop_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
3
src/requirements.txt
Normal file
3
src/requirements.txt
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
loguru==0.6.0
|
||||||
|
paho-mqtt==1.6.1
|
||||||
|
psycopg2==2.9.5
|
Reference in New Issue
Block a user