This commit is contained in:
Wolfgang Hottgenroth 2023-05-22 15:57:08 +02:00
commit 07f80924fd
5 changed files with 295 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
ENV
*.pyc
.venv/*
src/.venv/*

64
.gitlab-ci.yml Normal file
View File

@ -0,0 +1,64 @@
stages:
- dockerize
- deploy
variables:
IMAGE_NAME: $CI_REGISTRY/$CI_PROJECT_PATH
dockerize:
image: registry.hottis.de/dockerized/docker-bash:latest
stage: dockerize
tags:
- hottis
- linux
- docker
script:
- docker build --tag $IMAGE_NAME:${CI_COMMIT_SHORT_SHA} .
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY;
- docker push $IMAGE_NAME:${CI_COMMIT_SHORT_SHA}
- if [ "$CI_COMMIT_TAG" != "" ]; then
docker tag $IMAGE_NAME:${CI_COMMIT_SHORT_SHA} $IMAGE_NAME:${CI_COMMIT_TAG};
docker push $IMAGE_NAME:${CI_COMMIT_TAG};
fi
.deploy:
image: registry.hottis.de/dockerized/docker-bash:latest
stage: deploy
only:
- tags
variables:
GIT_STRATEGY: none
CONTAINER_NAME: level-preprocessor
script:
- docker stop $CONTAINER_NAME || echo "container not running, never mind"
- docker rm $CONTAINER_NAME || echo "container not existing, never mind"
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker run
-d
--name $CONTAINER_NAME
--restart always
--network internal-network
-e "APPLICATION_TENANT=$APPLICATION_TENANT"
-e "MQTT_LOGIN=$MQTT_LOGIN"
-e "MQTT_PASSWORD=$MQTT_PASSWORD"
-e "MQTT_BROKER=$MQTT_BROKER"
-e "PGHOST=timescaledb"
-e "PGUSER=$PGUSER"
-e "PGPASSWORD=$PGPASSWORD"
-e "PGDATABASE=$PGDATABASE"
$IMAGE_NAME:$CI_COMMIT_TAG
- docker network connect external-network $CONTAINER_NAME
deploy-saerbeck:
extends: .deploy
tags:
- saerbeck-deployment-only
environment:
name: saerbeck-production
deploy-berresheim:
extends: .deploy
tags:
- berresheim-deployment-only
environment:
name: berresheim-production

35
Dockerfile Normal file
View File

@ -0,0 +1,35 @@
FROM python:3.10-bullseye
ENV MQTT_LOGIN "-"
ENV MQTT_PASSWORD "-"
ENV MQTT_BROKER "-"
ENV MQTT_PORT "8883"
ENV MQTT_CA ""
ENV PGHOST ""
ENV PGPORT "5432"
ENV PGUSER "-"
ENV PGPASSWORD "-"
ENV PGSSLMODE "disable"
ENV PGDATABASE "-"
ARG APP_DIR="/opt/app"
RUN \
apt update && \
apt upgrade -y && \
apt install -y libpq-dev && \
mkdir -p ${APP_DIR}
COPY src/requirements.txt ${APP_DIR}/
COPY src/preprocess.py ${APP_DIR}/
WORKDIR ${APP_DIR}
RUN \
pip install -r requirements.txt
USER nobody
CMD [ "python", "preprocess.py" ]

188
src/preprocess.py Normal file
View File

@ -0,0 +1,188 @@
from loguru import logger
import paho.mqtt.client as mqtt
import os
import sys
import ssl
import json
import base64
import struct
import psycopg2
import psycopg2.extras
class VariableNotFoundException (Exception):
def __init__(self, appId, converterId, deviceId, variableId):
self.appId = appId
self.converterId = converterId
self.deviceId = deviceId
self.variableId = variableId
class InvalidTopicException (Exception):
def __init__(self, topic):
self.topic = topic
class JustIgnoreMessage (Exception):
def __init__(self, application, device, message):
self.application = application
self.device = device
self.message = message
class DbOp(object):
def __init__(self, config):
self.conn = None
def __getConn(self):
conn = psycopg2.connect()
conn.autocommit = False
return conn
def getVariable(self, appId, converterId, deviceId, variableId):
try:
conn = self.__getConn()
with conn:
with conn.cursor() as cur:
cur.execute("""
select
a.label as application,
v.label as variable,
v.unit as unit
from application_t a, variable_t v
where a.app_id = %(appId)s and
v.app = a.id and
v.converter_id = %(converterId)s and
v.device_id = %(deviceId)s and
v.variable_id = %(variableId)s
""", { 'appId': appId, 'converterId': converterId, 'deviceId': deviceId, 'variableId': variableId })
res = cur.fetchone()
if res is None:
raise VariableNotFoundException(appId, converterId, deviceId, variableId)
variable = {
'application': res[0],
'variable': res[1],
'unit': res[2]
}
logger.debug(f"{variable=}")
return application
except Exception as e:
logger.error(f"Error getting variable: {e}")
raise VariableNotFoundException(appId)
finally:
if conn:
conn.close()
def storeMeasurement(self, measurement):
try:
logger.info(f"About to store {measurement}")
conn = self.__getConn()
with conn:
with conn.cursor() as cur:
cur.execute("""
insert into measurement_t (time, application, variable, value, unit)
values (now(), %(application)s, %(variable)s, %(value)s, %(unit)s
""",
{
'application': measurement['application'],
'variable': measurement['variable'],
'value': measurement['value'],
'unit': measurement['unit']
}
)
except Exception as e:
logger.error(f"Error storing measurement: {e}")
finally:
if conn:
conn.close()
def splitTopic(topic):
try:
(_, appId, converterId, rest) = topic.split('/', 3)
except ValueError:
raise InvalidTopicException(topic)
r = rest.split('/')
if (len(r) == 1:
deviceId = 'mains',
variableId = r[0]
elif (len(r) == 2:
(deviceId, variableid) = r
else:
raise InvalidTopicException(topic)
return (appId, converterId, deviceId, variableid)
def mqttOnConnectCallback(client, userdata, flags, rc):
logger.info("mqtt connected")
mqtt_in_topic = f"cem/#"
client.subscribe(mqtt_in_topic)
logger.info(f"subscribed to {mqtt_in_topic}")
def mqttOnMessageCallback(client, userdata, message):
try:
topic = message.topic
payload = message.payload
logger.debug(f"mqtt message received: {topic} -> {payload}")
(appId, converterId, deviceId, variableId) = splitTopic(topic)
dbh = DbOp(config)
variable = dbh.getVariable(appId, converterId, deviceId, variableid)
measurement = {
"application": variable["application"],
"variable": variable["variable"],
"unit": variable["unit"],
"value": payload
}
logger.debug(f"{measurement=}")
dbh.storeMeasurement(measurement)
except JustIgnoreMessage as e:
logger.info(f"JustIgnoreMessage: {e.application}, {e.device}, {e.message}")
except VariableNotFoundException as e:
logger.error(f"message from unknown topic {e.appId, e.converterId, e.deviceId, e.variableId}")
except InvalidTopicException as e:
logger.error(f"invalid topic {e.topic}")
except Exception as e:
logger.error(f"unable to parse message {payload}, {e}")
def mqttOnDisconnectCallback(client, userdata, rc):
pass
logger.info("preprocess starting")
REQUIRED_CONFIG_OPTIONS = [
'MQTT_LOGIN',
'MQTT_PASSWORD',
'MQTT_BROKER',
'MQTT_PORT',
'MQTT_CA',
'APPLICATION_TENANT'
]
config = {}
for rco in REQUIRED_CONFIG_OPTIONS:
try:
config[rco] = os.environ[rco]
except KeyError:
logger.error(f"{rco} is a required config option, not available in environment")
sys.exit(-1)
client = mqtt.Client()
client.on_message = mqttOnMessageCallback
client.on_connect = mqttOnConnectCallback
client.on_disconnect = mqttOnDisconnectCallback
client.username_pw_set(config['MQTT_LOGIN'], config['MQTT_PASSWORD'])
client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
ciphers=None
)
client.connect(config["MQTT_BROKER"], int(config["MQTT_PORT"]))
#client.connect('172.16.2.16', 1883)
logger.info("mqtt loop starting")
client.loop_forever()

3
src/requirements.txt Normal file
View File

@ -0,0 +1,3 @@
loguru==0.6.0
paho-mqtt==1.6.1
psycopg2==2.9.5