initial
This commit is contained in:
commit
de1d5be201
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
ENV
|
||||||
|
*.pyc
|
||||||
|
.venv/*
|
||||||
|
src/.venv/*
|
||||||
|
|
64
.gitlab-ci.yml
Normal file
64
.gitlab-ci.yml
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
stages:
|
||||||
|
- dockerize
|
||||||
|
- deploy
|
||||||
|
|
||||||
|
variables:
|
||||||
|
IMAGE_NAME: $CI_REGISTRY/$CI_PROJECT_PATH
|
||||||
|
|
||||||
|
dockerize:
|
||||||
|
image: registry.hottis.de/dockerized/docker-bash:latest
|
||||||
|
stage: dockerize
|
||||||
|
tags:
|
||||||
|
- hottis
|
||||||
|
- linux
|
||||||
|
- docker
|
||||||
|
script:
|
||||||
|
- docker build --tag $IMAGE_NAME:${CI_COMMIT_SHORT_SHA} .
|
||||||
|
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY;
|
||||||
|
- docker push $IMAGE_NAME:${CI_COMMIT_SHORT_SHA}
|
||||||
|
- if [ "$CI_COMMIT_TAG" != "" ]; then
|
||||||
|
docker tag $IMAGE_NAME:${CI_COMMIT_SHORT_SHA} $IMAGE_NAME:${CI_COMMIT_TAG};
|
||||||
|
docker push $IMAGE_NAME:${CI_COMMIT_TAG};
|
||||||
|
fi
|
||||||
|
|
||||||
|
.deploy:
|
||||||
|
image: registry.hottis.de/dockerized/docker-bash:latest
|
||||||
|
stage: deploy
|
||||||
|
only:
|
||||||
|
- tags
|
||||||
|
variables:
|
||||||
|
GIT_STRATEGY: none
|
||||||
|
CONTAINER_NAME: soil-preprocessor
|
||||||
|
script:
|
||||||
|
- docker stop $CONTAINER_NAME || echo "container not running, never mind"
|
||||||
|
- docker rm $CONTAINER_NAME || echo "container not existing, never mind"
|
||||||
|
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
|
||||||
|
- docker run
|
||||||
|
-d
|
||||||
|
--name $CONTAINER_NAME
|
||||||
|
--restart always
|
||||||
|
--network internal-network
|
||||||
|
-e "APPLICATION_TENANT=$APPLICATION_TENANT"
|
||||||
|
-e "MQTT_LOGIN=$MQTT_LOGIN"
|
||||||
|
-e "MQTT_PASSWORD=$MQTT_PASSWORD"
|
||||||
|
-e "MQTT_BROKER=$MQTT_BROKER"
|
||||||
|
-e "PGHOST=timescaledb"
|
||||||
|
-e "PGUSER=$PGUSER"
|
||||||
|
-e "PGPASSWORD=$PGPASSWORD"
|
||||||
|
-e "PGDATABASE=$PGDATABASE"
|
||||||
|
$IMAGE_NAME:$CI_COMMIT_TAG
|
||||||
|
- docker network connect external-network $CONTAINER_NAME
|
||||||
|
|
||||||
|
deploy-saerbeck:
|
||||||
|
extends: .deploy
|
||||||
|
tags:
|
||||||
|
- saerbeck-deployment-only
|
||||||
|
environment:
|
||||||
|
name: saerbeck-production
|
||||||
|
|
||||||
|
deploy-berresheim:
|
||||||
|
extends: .deploy
|
||||||
|
tags:
|
||||||
|
- berresheim-deployment-only
|
||||||
|
environment:
|
||||||
|
name: berresheim-production
|
37
Dockerfile
Normal file
37
Dockerfile
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
FROM python:3.10-bullseye
|
||||||
|
|
||||||
|
ENV APPLICATION_TENANT "-"
|
||||||
|
|
||||||
|
ENV MQTT_LOGIN "-"
|
||||||
|
ENV MQTT_PASSWORD "-"
|
||||||
|
ENV MQTT_BROKER "-"
|
||||||
|
ENV MQTT_PORT "8883"
|
||||||
|
ENV MQTT_CA ""
|
||||||
|
|
||||||
|
ENV PGHOST ""
|
||||||
|
ENV PGPORT "5432"
|
||||||
|
ENV PGUSER "-"
|
||||||
|
ENV PGPASSWORD "-"
|
||||||
|
ENV PGSSLMODE "disable"
|
||||||
|
ENV PGDATABASE "-"
|
||||||
|
|
||||||
|
ARG APP_DIR="/opt/app"
|
||||||
|
|
||||||
|
RUN \
|
||||||
|
apt update && \
|
||||||
|
apt upgrade -y && \
|
||||||
|
apt install -y libpq-dev && \
|
||||||
|
mkdir -p ${APP_DIR}
|
||||||
|
|
||||||
|
COPY src/requirements.txt ${APP_DIR}/
|
||||||
|
COPY src/preprocess.py ${APP_DIR}/
|
||||||
|
|
||||||
|
WORKDIR ${APP_DIR}
|
||||||
|
|
||||||
|
RUN \
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
USER nobody
|
||||||
|
|
||||||
|
CMD [ "python", "preprocess.py" ]
|
||||||
|
|
29
docs/decoding.txt
Normal file
29
docs/decoding.txt
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
Soil - LSE01
|
||||||
|
|
||||||
|
http://wiki.dragino.com/xwiki/bin/view/Main/User%20Manual%20for%20LoRaWAN%20End%20Nodes/LSE01-LoRaWAN%20Soil%20Moisture%20%26%20EC%20Sensor%20User%20Manual/
|
||||||
|
|
||||||
|
|
||||||
|
Value, Size (bytes)
|
||||||
|
Battery (mV), 2
|
||||||
|
Moisture (%), 2
|
||||||
|
Temperature, 2
|
||||||
|
Conductivity(uS/cm), 2
|
||||||
|
Status, 1
|
||||||
|
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import struct
|
||||||
|
|
||||||
|
payload = 'DQYCjAAAAAE='
|
||||||
|
frame = base64.b64decode(payload)
|
||||||
|
|
||||||
|
battery = struct.unpack('>H', frame[0:2])[0]
|
||||||
|
moisture = struct.unpack('>H', frame[2:4])[0]
|
||||||
|
temperature = struct.unpack('>H', frame[4:6])[0]
|
||||||
|
conductivity = struct.unpack('>H', frame[6:8])[0]
|
||||||
|
status = struct.unpack('?', frame[8:9])[0]
|
||||||
|
|
||||||
|
print(f"{battery=}, {moisture=}, {temperature=}, {conductivity=}, {status=}")
|
||||||
|
|
||||||
|
|
||||||
|
|
31
schema/create-schema.sql
Normal file
31
schema/create-schema.sql
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
create database soil_monitoring;
|
||||||
|
-- create extension timescaledb;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table application_t (
|
||||||
|
id serial not null primary key,
|
||||||
|
device_id VARCHAR(32) NOT NULL UNIQUE,
|
||||||
|
label varchar(64) not null unique,
|
||||||
|
sensor_type varchar(16) not null
|
||||||
|
);
|
||||||
|
|
||||||
|
create table measurement_t (
|
||||||
|
time timestamp without time zone not null,
|
||||||
|
application_name varchar(64) not null,
|
||||||
|
moisture float,
|
||||||
|
temperature float,
|
||||||
|
conductivity float,
|
||||||
|
status varchar(16),
|
||||||
|
battery float
|
||||||
|
);
|
||||||
|
|
||||||
|
select create_hypertable('measurement_t', 'time');
|
||||||
|
|
||||||
|
insert into application_t (device_id, label, sensor_type)
|
||||||
|
values('eui-a840412ee18341d5', 'saerbeck-bachaue', 'LSE01');
|
||||||
|
|
||||||
|
-- create user preprocessor password 'geheim';
|
||||||
|
grant select on application_t to preprocessor;
|
||||||
|
grant insert on measurement_t to preprocessor;
|
||||||
|
|
184
src/preprocess.py
Normal file
184
src/preprocess.py
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
from loguru import logger
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import ssl
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
import struct
|
||||||
|
import psycopg2
|
||||||
|
import psycopg2.extras
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class ApplicationNotFoundException (Exception):
|
||||||
|
def __init__(self, deviceId):
|
||||||
|
self.deviceId = deviceId
|
||||||
|
|
||||||
|
class SensorTypeNotFoundException (Exception):
|
||||||
|
def __init__(self, sensorType):
|
||||||
|
self.sensorType = sensorType
|
||||||
|
|
||||||
|
class JustIgnoreMessage (Exception):
|
||||||
|
def __init__(self, application, device, message):
|
||||||
|
self.application = application
|
||||||
|
self.device = device
|
||||||
|
self.message = message
|
||||||
|
|
||||||
|
class DbOp(object):
|
||||||
|
def __init__(self, config):
|
||||||
|
self.conn = None
|
||||||
|
|
||||||
|
def __getConn(self):
|
||||||
|
conn = psycopg2.connect()
|
||||||
|
conn.autocommit = False
|
||||||
|
return conn
|
||||||
|
|
||||||
|
def getApplication(self, deviceId):
|
||||||
|
try:
|
||||||
|
conn = self.__getConn()
|
||||||
|
with conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("select label, sensor_type from application_t where device_id = %(deviceId)s", { 'deviceId': deviceId })
|
||||||
|
res = cur.fetchone()
|
||||||
|
if res is None:
|
||||||
|
raise ApplicationNotFoundException(deviceId)
|
||||||
|
application = {
|
||||||
|
'label': res[0],
|
||||||
|
'sensor_type': res[1]
|
||||||
|
}
|
||||||
|
logger.debug(f"{application=}")
|
||||||
|
return application
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting device: {e}")
|
||||||
|
raise ApplicationNotFoundException(deviceId)
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def storeMeasurement(self, measurement):
|
||||||
|
try:
|
||||||
|
logger.info(f"About to store {measurement}")
|
||||||
|
conn = self.__getConn()
|
||||||
|
with conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("""
|
||||||
|
insert into measurement_t (time, application_name, moisture, temperature, conductivity, status, battery)
|
||||||
|
values (now(), %(application_name)s, %(moisture)s, %(temperature)s, %(conductivity), %(status)s, %(battery)s)
|
||||||
|
""",
|
||||||
|
{
|
||||||
|
'application_name': measurement['application_name'],
|
||||||
|
'moisture': measurement['moisture'],
|
||||||
|
'temperature': measurement['temperature'],
|
||||||
|
'conductivity': measurement['conductivity'],
|
||||||
|
'status': measurement['status'],
|
||||||
|
'battery': measurement['battery']
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error storing measurement: {e}")
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def mqttOnConnectCallback(client, userdata, flags, rc):
|
||||||
|
logger.info("mqtt connected")
|
||||||
|
mqtt_in_topic = f"v3/{config['APPLICATION_TENANT']}/devices/+/up"
|
||||||
|
client.subscribe(mqtt_in_topic)
|
||||||
|
logger.info(f"subscribed to {mqtt_in_topic}")
|
||||||
|
|
||||||
|
def mqttOnMessageCallback(client, userdata, message):
|
||||||
|
try:
|
||||||
|
topic = message.topic
|
||||||
|
payload = message.payload
|
||||||
|
logger.debug(f"mqtt message received: {topic} -> {payload}")
|
||||||
|
parse_payload = json.loads(payload)
|
||||||
|
|
||||||
|
sendSetupMessage = False
|
||||||
|
device_id = parse_payload['end_device_ids']['device_id']
|
||||||
|
|
||||||
|
dbh = DbOp(config)
|
||||||
|
application = dbh.getApplication(device_id)
|
||||||
|
|
||||||
|
frame = base64.b64decode(parse_payload['uplink_message']['frm_payload'])
|
||||||
|
f_port = parse_payload['uplink_message']['f_port']
|
||||||
|
measurement = {}
|
||||||
|
|
||||||
|
match application['sensor_type']:
|
||||||
|
case 'LSE01':
|
||||||
|
battery = struct.unpack('>H', frame[0:2])[0]
|
||||||
|
moisture = struct.unpack('>H', frame[2:4])[0]
|
||||||
|
temperature = struct.unpack('>H', frame[4:6])[0]
|
||||||
|
conductivity = struct.unpack('>H', frame[6:8])[0]
|
||||||
|
status = struct.unpack('?', frame[8:9])[0]
|
||||||
|
logger.debug(f"{frame=}, {battery=}, {moisture=}, {temperature=}, {conductivity=}, {status=}")
|
||||||
|
|
||||||
|
statusText = 'Ok'
|
||||||
|
if not status:
|
||||||
|
statusText = 'No sensor'
|
||||||
|
|
||||||
|
measurement = {
|
||||||
|
'application_name': application['label'],
|
||||||
|
'moisture': moisture / 100,
|
||||||
|
'temperature': temperature / 100,
|
||||||
|
'conductivity': conductivity,
|
||||||
|
'status': statusText,
|
||||||
|
'battery': battery / 1000
|
||||||
|
}
|
||||||
|
case _:
|
||||||
|
raise SensorTypeNotFoundException(application['sensor_type'])
|
||||||
|
|
||||||
|
logger.debug(f"{measurement=}")
|
||||||
|
dbh.storeMeasurement(measurement)
|
||||||
|
except JustIgnoreMessage as e:
|
||||||
|
logger.info(f"JustIgnoreMessage: {e.application}, {e.device}, {e.message}")
|
||||||
|
except SensorTypeNotFoundException as e:
|
||||||
|
logger.error(f"application has unknown sensor type {e.sensorType}")
|
||||||
|
except ApplicationNotFoundException as e:
|
||||||
|
logger.error(f"message from unknown device {e.deviceId}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"unable to parse message {payload}, {e}")
|
||||||
|
|
||||||
|
def mqttOnDisconnectCallback(client, userdata, rc):
|
||||||
|
pass
|
||||||
|
|
||||||
|
logger.info("preprocess starting")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
REQUIRED_CONFIG_OPTIONS = [
|
||||||
|
'MQTT_LOGIN',
|
||||||
|
'MQTT_PASSWORD',
|
||||||
|
'MQTT_BROKER',
|
||||||
|
'MQTT_PORT',
|
||||||
|
'MQTT_CA',
|
||||||
|
'APPLICATION_TENANT'
|
||||||
|
]
|
||||||
|
|
||||||
|
config = {}
|
||||||
|
for rco in REQUIRED_CONFIG_OPTIONS:
|
||||||
|
try:
|
||||||
|
config[rco] = os.environ[rco]
|
||||||
|
except KeyError:
|
||||||
|
logger.error(f"{rco} is a required config option, not available in environment")
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
client = mqtt.Client()
|
||||||
|
client.on_message = mqttOnMessageCallback
|
||||||
|
client.on_connect = mqttOnConnectCallback
|
||||||
|
client.on_disconnect = mqttOnDisconnectCallback
|
||||||
|
client.username_pw_set(config['MQTT_LOGIN'], config['MQTT_PASSWORD'])
|
||||||
|
client.tls_set(
|
||||||
|
cert_reqs=ssl.CERT_REQUIRED,
|
||||||
|
ciphers=None
|
||||||
|
)
|
||||||
|
client.connect(config["MQTT_BROKER"], int(config["MQTT_PORT"]))
|
||||||
|
#client.connect('172.16.2.16', 1883)
|
||||||
|
logger.info("mqtt loop starting")
|
||||||
|
client.loop_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
3
src/requirements.txt
Normal file
3
src/requirements.txt
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
loguru==0.6.0
|
||||||
|
paho-mqtt==1.6.1
|
||||||
|
psycopg2==2.9.5
|
Loading…
x
Reference in New Issue
Block a user