13 Commits

Author SHA1 Message Date
9ae09494d2 deployment fixed 2023-11-09 21:32:29 +01:00
58f17be962 fix 2023-11-09 18:04:05 +01:00
0eb133cede fix 2023-11-09 17:57:55 +01:00
ef84704340 push to docker hub too 2023-11-09 17:40:49 +01:00
46d840e9de adjust units and forgotten variable 2023-05-24 16:35:44 +02:00
68f98d7668 add view for measurement 2023-05-23 16:06:57 +02:00
12b204d206 fix typo 2023-05-23 15:59:41 +02:00
2d9372ea38 change schema 2023-05-23 15:57:50 +02:00
863644fa56 casting 2023-05-23 13:36:45 +02:00
41ea343be2 another fix 2023-05-23 13:28:14 +02:00
f8f9449c99 another fix 2023-05-23 13:26:48 +02:00
ab35b815fb fix 2023-05-23 13:19:49 +02:00
ec3fb5c72c fix 2023-05-23 13:17:12 +02:00
8 changed files with 160 additions and 61 deletions

View File

@ -4,6 +4,8 @@ stages:
variables:
IMAGE_NAME: $CI_REGISTRY/$CI_PROJECT_PATH
HUB_IMAGE_NAME: $DOCKER_HUB_LOGIN/$CI_PROJECT_NAME
dockerize:
image: registry.hottis.de/dockerized/docker-bash:latest
@ -19,6 +21,12 @@ dockerize:
- if [ "$CI_COMMIT_TAG" != "" ]; then
docker tag $IMAGE_NAME:${CI_COMMIT_SHORT_SHA} $IMAGE_NAME:${CI_COMMIT_TAG};
docker push $IMAGE_NAME:${CI_COMMIT_TAG};
docker login -u $DOCKER_HUB_LOGIN -p $DOCKER_HUB_PASSWORD;
docker tag $IMAGE_NAME:${CI_COMMIT_SHORT_SHA} $HUB_IMAGE_NAME:${CI_COMMIT_TAG};
docker tag $IMAGE_NAME:${CI_COMMIT_SHORT_SHA} $HUB_IMAGE_NAME:latest;
docker push $HUB_IMAGE_NAME:${CI_COMMIT_TAG};
docker push $HUB_IMAGE_NAME:latest;
fi
.deploy:
@ -49,9 +57,10 @@ dockerize:
- docker network connect external-network $CONTAINER_NAME
deploy-berresheim:
extends: .deploy
tags:
- berresheim-deployment-only
environment:
name: berresheim-production
# deploy-berresheim:
# extends: .deploy
# tags:
# - berresheim-deployment-only
# environment:
# name: berresheim-production
#

View File

@ -3,7 +3,7 @@ FROM python:3.10-bullseye
ENV MQTT_LOGIN "-"
ENV MQTT_PASSWORD "-"
ENV MQTT_BROKER "-"
ENV MQTT_PORT "8883"
ENV MQTT_PORT "1883"
ENV MQTT_CA ""
ENV PGHOST ""

11
deployment/install.sh Executable file
View File

@ -0,0 +1,11 @@
#!/bin/bash
NAMESPACE=$(cat namespace)
~/Workspace/MyKubernetesEnv/tools/create-namespace.sh $NAMESPACE
./roll-db-credential.sh
kubectl -f install.yml -n $NAMESPACE apply

36
deployment/install.yml Normal file
View File

@ -0,0 +1,36 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: cemmetering
data:
MQTT_BROKER: "emqx01-anonymous-cluster-internal.broker.svc.cluster.local"
MQTT_PORT: "1883"
PGDATABASE: "cem_monitoring_berresheim"
PGHOST: "timescaledb.database.svc.cluster.local"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: cemmetering
labels:
app: cemmetering
spec:
replicas: 1
selector:
matchLabels:
app: cemmetering
template:
metadata:
labels:
app: cemmetering
spec:
containers:
- name: cemmetering
image: wollud1969/cemmetering-preprocessor:1.0.2
envFrom:
- configMapRef:
name: cemmetering
- secretRef:
name: cemmetering-db-cred

1
deployment/namespace Normal file
View File

@ -0,0 +1 @@
berresheim

View File

@ -0,0 +1,33 @@
#!/bin/bash
. ~/Workspace/MyKubernetesEnv/ENVDB
DATABASE=cem_monitoring_berresheim
LOGIN=cem_preprocessor
PASSWORD=`openssl rand -base64 24`
NAMESPACE=`cat namespace`
psql <<EOF
do
\$\$
begin
if not exists (SELECT * FROM pg_user WHERE usename = '$LOGIN') then
CREATE USER $LOGIN WITH PASSWORD '$PASSWORD';
else
ALTER USER $LOGIN WITH PASSWORD '$PASSWORD';
end if;
end
\$\$
;
commit;
EOF
kubectl create secret generic cemmetering-db-cred \
--dry-run=client \
-o yaml \
--save-config \
--from-literal=PGUSER="$LOGIN" \
--from-literal=PGPASSWORD="$PASSWORD" | \
kubectl apply -f - -n $NAMESPACE

View File

@ -1,5 +1,5 @@
create database cem_monitoring;
-- create extension timescaledb;
create extension timescaledb;
@ -17,29 +17,40 @@ create table variable_t (
variable_id varchar(16) not null,
label varchar(128) not null unique,
unit varchar(16),
quantity varchar(32),
unique (app, converter_id, device_id, variable_id)
);
create table measurement_t (
time timestamp without time zone not null,
application varchar(128) not null,
variable varchar(128) not null,
unit varchar(16),
time timestamptz not null,
variable integer references variable_t (id),
value float
);
select create_hypertable('measurement_t', 'time');
insert into application_t (app_id, label) values('ku226', 'Kupferdreher Str. 226, Essen');
insert into variable_t (app, converter_id, device_id, variable_id, label, unit)
values ((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'u', 'BHKW Spannung', 'V'),
((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'p', 'BHKW Leistung', 'W'),
((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'import', 'BHKW Energie Export', 'Wh'),
((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'export', 'BHKW Energie Import', 'Wh'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'u', 'PV Spannung', 'V'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'p', 'PV Leistung', 'W'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'import', 'PV Energie Export', 'Wh'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'export', 'PV Energie Import', 'Wh')
insert into variable_t (app, converter_id, device_id, variable_id, label, unit, quantity)
values ((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'u', 'BHKW Spannung', 'V', 'voltage'),
((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'p', 'BHKW Leistung', 'kW', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'import', 'BHKW Energie Export', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '1', 'bhkw', 'export', 'BHKW Energie Import', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'u', 'PV Spannung', 'V', 'voltage'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'p', 'PV Leistung', 'kW', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'import', 'PV Energie Export', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '1', 'pv', 'export', 'PV Energie Import', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '3', 'mains', 'import', 'Netz Energie Import', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '3', 'mains', 'p', 'Netz Leistung', 'W', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'w1', 'p', 'Wohnung 1 Leistung', 'kW', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'w1', 'import', 'Wohnung 1 Energie Import', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '1', 'w2', 'p', 'Wohnung 2 Leistung', 'kW', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'w2', 'import', 'Wohnung 2 Energie Import', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '1', 'w3', 'p', 'Wohnung 3 Leistung', 'kW', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'w3', 'import', 'Wohnung 3 Energie Import', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '1', 'w4', 'p', 'Wohnung 4 Leistung', 'kW', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'w4', 'import', 'Wohnung 4 Energie Import', 'Wh', 'energy'),
((select id from application_t where app_id = 'ku226'), '1', 'w5', 'p', 'Wohnung 5 Leistung', 'kW', 'power'),
((select id from application_t where app_id = 'ku226'), '1', 'w5', 'import', 'Wohnung 5 Energie Import', 'Wh', 'energy')
;
@ -49,4 +60,22 @@ insert into variable_t (app, converter_id, device_id, variable_id, label, unit)
grant select on application_t to preprocessor;
grant select on variable_t to preprocessor;
grant insert on measurement_t to preprocessor;
grant select on measurement_t to grafana;
create view measurement_v as
select m.time as time,
m.value as value,
v.quantity as quantity,
v.unit as unit,
v.label as variable,
a.label as application
from measurement_t m,
application_t a,
variable_t v
where m.variable = v.id and
v.app = a.id;
grant select on measurement_v to grafana;

View File

@ -29,7 +29,7 @@ class JustIgnoreMessage (Exception):
self.message = message
class DbOp(object):
def __init__(self, config):
def __init__(self):
self.conn = None
def __getConn(self):
@ -46,7 +46,9 @@ class DbOp(object):
select
a.label as application,
v.label as variable,
v.unit as unit
v.quantity as quantity,
v.unit as unit,
v.id as vid
from application_t a, variable_t v
where a.app_id = %(appId)s and
v.app = a.id and
@ -60,10 +62,12 @@ class DbOp(object):
variable = {
'application': res[0],
'variable': res[1],
'unit': res[2]
'quantity': res[2],
'unit': res[3],
'vid': res[4]
}
logger.debug(f"{variable=}")
return application
return variable
except Exception as e:
logger.error(f"Error getting variable: {e}")
raise VariableNotFoundException(appId, converterId, deviceId, variableId)
@ -78,14 +82,12 @@ class DbOp(object):
with conn:
with conn.cursor() as cur:
cur.execute("""
insert into measurement_t (time, application, variable, value, unit)
values (now(), %(application)s, %(variable)s, %(value)s, %(unit)s
insert into measurement_t (time, variable, value)
values (now(), %(vid)s, %(value)s)
""",
{
'application': measurement['application'],
'variable': measurement['variable'],
'vid': measurement['vid'],
'value': measurement['value'],
'unit': measurement['unit']
}
)
except Exception as e:
@ -100,14 +102,14 @@ def splitTopic(topic):
except ValueError:
raise InvalidTopicException(topic)
r = rest.split('/')
if (len(r) == 1:
if (len(r) == 1):
deviceId = 'mains',
variableId = r[0]
elif (len(r) == 2:
(deviceId, variableid) = r
elif (len(r) == 2):
(deviceId, variableId) = r
else:
raise InvalidTopicException(topic)
return (appId, converterId, deviceId, variableid)
return (appId, converterId, deviceId, variableId)
def mqttOnConnectCallback(client, userdata, flags, rc):
logger.info("mqtt connected")
@ -123,13 +125,11 @@ def mqttOnMessageCallback(client, userdata, message):
(appId, converterId, deviceId, variableId) = splitTopic(topic)
dbh = DbOp(config)
variable = dbh.getVariable(appId, converterId, deviceId, variableid)
dbh = DbOp()
variable = dbh.getVariable(appId, converterId, deviceId, variableId)
measurement = {
"application": variable["application"],
"variable": variable["variable"],
"unit": variable["unit"],
"value": payload
"vid": variable["vid"],
"value": float(payload)
}
logger.debug(f"{measurement=}")
@ -150,21 +150,6 @@ logger.info("preprocess starting")
REQUIRED_CONFIG_OPTIONS = [
'MQTT_LOGIN',
'MQTT_PASSWORD',
'MQTT_BROKER',
'MQTT_PORT',
'MQTT_CA'
]
config = {}
for rco in REQUIRED_CONFIG_OPTIONS:
try:
config[rco] = os.environ[rco]
except KeyError:
logger.error(f"{rco} is a required config option, not available in environment")
sys.exit(-1)
@ -172,12 +157,7 @@ client = mqtt.Client()
client.on_message = mqttOnMessageCallback
client.on_connect = mqttOnConnectCallback
client.on_disconnect = mqttOnDisconnectCallback
client.username_pw_set(config['MQTT_LOGIN'], config['MQTT_PASSWORD'])
client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
ciphers=None
)
client.connect(config["MQTT_BROKER"], int(config["MQTT_PORT"]))
client.connect(os.environ["MQTT_BROKER"], int(os.environ["MQTT_PORT"]))
#client.connect('172.16.2.16', 1883)
logger.info("mqtt loop starting")
client.loop_forever()