Compare commits
36 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
a78c6952f0
|
|||
|
3b69e1e2af
|
|||
|
2dfca8d70a
|
|||
|
0352b720cd
|
|||
|
95984157e8
|
|||
| 61509c0000 | |||
| 3c09c04066 | |||
| af739c7148 | |||
| 33ff176c79 | |||
| 134e3706cc | |||
| 3a56309b9f | |||
|
084645f002
|
|||
|
9815371199
|
|||
|
4debe45592
|
|||
|
71773968c9
|
|||
|
574e2886f5
|
|||
|
e25693fb84
|
|||
|
ff49d285dc
|
|||
|
8c3977162b
|
|||
|
b99b47ca40
|
|||
|
c40805b4cb
|
|||
|
a2eb38b414
|
|||
|
64cf45e22f
|
|||
|
9310a86687
|
|||
|
f4b404e2b1
|
|||
|
29148a13f4
|
|||
|
0356e9dcee
|
|||
|
a5b981357d
|
|||
|
57bbc6135e
|
|||
|
d704f7ba5e
|
|||
|
d0567a48f1
|
|||
|
ee22996433
|
|||
|
4130befdbf
|
|||
|
77c5df0697
|
|||
|
d4ee4c49de
|
|||
| ae938d10b9 |
@@ -1,32 +1,73 @@
|
|||||||
|
when:
|
||||||
|
- event: tag
|
||||||
|
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
build:
|
build:
|
||||||
|
image: golang:1.22.5-alpine3.20
|
||||||
|
commands:
|
||||||
|
- GOPATH=/woodpecker/go
|
||||||
|
- cd src/udi
|
||||||
|
- go mod tidy
|
||||||
|
- go build -a -installsuffix nocgo -o udi main.go
|
||||||
|
- cp udi ../..
|
||||||
|
|
||||||
|
scan:
|
||||||
|
image: quay.io/wollud1969/woodpecker-helper:0.5.1
|
||||||
|
environment:
|
||||||
|
TRIVY_TOKEN:
|
||||||
|
from_secret: trivy_token
|
||||||
|
TRIVY_URL:
|
||||||
|
from_secret: trivy_url
|
||||||
|
DTRACK_API_KEY:
|
||||||
|
from_secret: dtrack_api_key
|
||||||
|
DTRACK_API_URL:
|
||||||
|
from_secret: dtrack_api_url
|
||||||
|
commands:
|
||||||
|
- export GOPATH=/woodpecker/go # the export is required, otherwise trivy will not consider the variable
|
||||||
|
- HOME=/home/`id -nu`
|
||||||
|
- TAG="${CI_COMMIT_TAG:-$CI_COMMIT_SHA}"
|
||||||
|
- |
|
||||||
|
trivy fs \
|
||||||
|
--server $TRIVY_URL \
|
||||||
|
--token $TRIVY_TOKEN \
|
||||||
|
--format cyclonedx \
|
||||||
|
--scanners license \
|
||||||
|
--output /tmp/sbom.xml \
|
||||||
|
.
|
||||||
|
- cat /tmp/sbom.xml
|
||||||
|
- |
|
||||||
|
curl -X "POST" \
|
||||||
|
-H "Content-Type: multipart/form-data" \
|
||||||
|
-H "X-Api-Key: $DTRACK_API_KEY" \
|
||||||
|
-F "autoCreate=true" \
|
||||||
|
-F "projectName=$CI_REPO" \
|
||||||
|
-F "projectVersion=$TAG" \
|
||||||
|
-F "bom=@/tmp/sbom.xml"\
|
||||||
|
"$DTRACK_API_URL/api/v1/bom"
|
||||||
|
|
||||||
|
dockerize:
|
||||||
image: plugins/kaniko
|
image: plugins/kaniko
|
||||||
settings:
|
settings:
|
||||||
repo: ${FORGE_NAME}/${CI_REPO}
|
repo: ${FORGE_NAME}/${CI_REPO}
|
||||||
registry:
|
registry:
|
||||||
from_secret: container_registry
|
from_secret: local_registry
|
||||||
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG}
|
tags: latest,${CI_COMMIT_TAG}
|
||||||
username:
|
username:
|
||||||
from_secret: container_registry_username
|
from_secret: local_username
|
||||||
password:
|
password:
|
||||||
from_secret: container_registry_password
|
from_secret: local_password
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
when:
|
|
||||||
- event: [push, tag]
|
|
||||||
|
|
||||||
deploy:
|
deploy:
|
||||||
image: portainer/kubectl-shell:latest
|
image: quay.io/wollud1969/k8s-admin-helper:0.4.1
|
||||||
secrets:
|
environment:
|
||||||
- source: kube_config
|
KUBE_CONFIG_CONTENT:
|
||||||
target: KUBE_CONFIG_CONTENT
|
from_secret: kube_config
|
||||||
- source: encryption_key
|
GPG_PASSPHRASE:
|
||||||
target: ENCRYPTION_KEY
|
from_secret: gpg_passphrase
|
||||||
- source: secrets_checksum
|
|
||||||
target: MD5_CHECKSUM
|
|
||||||
commands:
|
commands:
|
||||||
- export IMAGE_TAG=$CI_COMMIT_TAG
|
- export IMAGE_TAG=$CI_COMMIT_TAG
|
||||||
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
|
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
|
||||||
- export KUBECONFIG=/tmp/kubeconfig
|
- export KUBECONFIG=/tmp/kubeconfig
|
||||||
- ./deployment/deploy.sh
|
- ./deployment/deploy.sh
|
||||||
when:
|
|
||||||
- event: tag
|
|
||||||
|
|||||||
11
Dockerfile
11
Dockerfile
@@ -1,15 +1,8 @@
|
|||||||
FROM golang:1.22.5-alpine3.20 as builder
|
|
||||||
|
|
||||||
RUN mkdir -p /go/src
|
|
||||||
COPY ./src/ /go/src
|
|
||||||
WORKDIR /go/src/udi
|
|
||||||
RUN go build -a -installsuffix nocgo -o udi main.go
|
|
||||||
|
|
||||||
|
|
||||||
FROM scratch
|
FROM scratch
|
||||||
|
|
||||||
ENV UDI_CONF ""
|
ENV UDI_CONF ""
|
||||||
|
|
||||||
COPY --from=builder /go/src/udi ./
|
COPY udi ./
|
||||||
|
|
||||||
ENTRYPOINT ["./udi"]
|
ENTRYPOINT ["./udi"]
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ if [ "$MD5_CHECKSUM" = "" ]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo $ENCRYPTION_KEY
|
|
||||||
|
|
||||||
SECRETS_CIPHERTEXT_FILE=secrets.enc
|
SECRETS_CIPHERTEXT_FILE=secrets.enc
|
||||||
SECRETS_PLAINTEXT_FILE=/tmp/secrets
|
SECRETS_PLAINTEXT_FILE=/tmp/secrets
|
||||||
@@ -39,7 +38,7 @@ if [ "$MD5_CHECKSUM" != "$CALCULATED_CHECKSUM" ]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# cat $TMP_FILE
|
#cat $TMP_FILE
|
||||||
mv $TMP_FILE $SECRETS_PLAINTEXT_FILE
|
mv $TMP_FILE $SECRETS_PLAINTEXT_FILE
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,11 @@ if [ "$IMAGE_TAG" == "" ]; then
|
|||||||
echo "Make sure IMAGE_TAG is set"
|
echo "Make sure IMAGE_TAG is set"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
if [ "$GPG_PASSPHRASE" == "" ]; then
|
||||||
|
echo "Make sure GPG_PASSPHRASE is set"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
IMAGE_NAME=$FORGE_NAME/$CI_REPO
|
IMAGE_NAME=$FORGE_NAME/$CI_REPO
|
||||||
@@ -15,7 +20,10 @@ DEPLOYMENT_DIR=$PWD/deployment
|
|||||||
INSTANCES_DIR=$DEPLOYMENT_DIR/instances
|
INSTANCES_DIR=$DEPLOYMENT_DIR/instances
|
||||||
|
|
||||||
pushd $DEPLOYMENT_DIR > /dev/null
|
pushd $DEPLOYMENT_DIR > /dev/null
|
||||||
./decrypt-secrets.sh || exit 1
|
# ./decrypt-secrets.sh || exit 1
|
||||||
|
# . /tmp/secrets
|
||||||
|
|
||||||
|
gpg --decrypt --yes --batch --passphrase "$GPG_PASSPHRASE" --homedir /tmp/.gnupg -o /tmp/secrets secrets.asc
|
||||||
. /tmp/secrets
|
. /tmp/secrets
|
||||||
rm /tmp/secrets
|
rm /tmp/secrets
|
||||||
popd > /dev/null
|
popd > /dev/null
|
||||||
@@ -58,7 +66,7 @@ for NAMESPACE_DIR in `find $INSTANCES_DIR -type d -mindepth 1 -maxdepth 1`; do
|
|||||||
NEW_UDI_DB_PASSWORD="${!PASSWORD_VARIABLE}"
|
NEW_UDI_DB_PASSWORD="${!PASSWORD_VARIABLE}"
|
||||||
DATABASE_VARIABLE=$VARIABLE_PREFIX"_PGDATABASE"
|
DATABASE_VARIABLE=$VARIABLE_PREFIX"_PGDATABASE"
|
||||||
NEW_UDI_DB_DATABASE="${!DATABASE_VARIABLE}"
|
NEW_UDI_DB_DATABASE="${!DATABASE_VARIABLE}"
|
||||||
NEW_UDI_DB_HOST=timescaledb.database.svc.cluster.local
|
NEW_UDI_DB_HOST=database.database1.svc.cluster.local
|
||||||
|
|
||||||
kubectl create secret generic $INSTANCE-udi-db-cred \
|
kubectl create secret generic $INSTANCE-udi-db-cred \
|
||||||
--dry-run=client \
|
--dry-run=client \
|
||||||
|
|||||||
@@ -6,13 +6,22 @@
|
|||||||
"topicMappings": [
|
"topicMappings": [
|
||||||
{
|
{
|
||||||
"topics": [ "snmp" ],
|
"topics": [ "snmp" ],
|
||||||
"handler": "SNMP",
|
"handler": "PREP",
|
||||||
"id": "SNMP",
|
"id": "SNMP",
|
||||||
"config": {
|
"config": {
|
||||||
"attributes": {
|
"attributes": {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"topics": [ "tsm" ],
|
||||||
|
"handler": "PREP",
|
||||||
|
"id": "TSM",
|
||||||
|
"config": {
|
||||||
|
"attributes": {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"topics": [ "dt1/ai/periodic/1" ],
|
"topics": [ "dt1/ai/periodic/1" ],
|
||||||
"handler": "DT1T",
|
"handler": "DT1T",
|
||||||
@@ -53,6 +62,16 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"topics": [ "IoT/Car/Values" ],
|
||||||
|
"handler": "Car",
|
||||||
|
"id": "Car",
|
||||||
|
"config": {
|
||||||
|
"databaseConnStr": "",
|
||||||
|
"attributes": {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"topics": [ "locative/event/#" ],
|
"topics": [ "locative/event/#" ],
|
||||||
"handler": "Locative",
|
"handler": "Locative",
|
||||||
|
|||||||
20
deployment/secrets.asc
Normal file
20
deployment/secrets.asc
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
-----BEGIN PGP MESSAGE-----
|
||||||
|
|
||||||
|
jA0ECQMIetkwUzO5q5z+0ukB9IS9Z6rRbXzGHS4dOfwLMbyx2auDu7PYaAgHLHkI
|
||||||
|
25aul+Q2RMUc1WtWgtyIPyVrq84ctjx4AmnXU3Gbsqte6QJIk0RX8w886gTI+NeF
|
||||||
|
RJahlDNeL9LLltQInkFPDXgLC0u/8a+az0aVZUnfAHVhcYkkgQ0JZasc0VQjLqHV
|
||||||
|
dQ3sHmGj1sUC712AujE1f+T5J9CImBdK4r4sZeFq0c3km7hIpOJNkg8LX+0fj9Un
|
||||||
|
/LPPg0Qjq15dnb44UB6g+gsCUCJqTylGAMH4HLvsgRLbs4T4Za0O9lUKZ7UzuvlZ
|
||||||
|
MnbJLIyJt2zxBVQ6NeuchvTmb8Ppkrk2qrO2mXC4hB0YyGtmT3Dy178pNdy7mYR6
|
||||||
|
zLPx7wO+x4MOJOY8uzclHLucAfBhLTgTFCrje+268e99bXuFhAXOA2zqYSyhBI5M
|
||||||
|
XnHUr14XsTACy8CwZRoIpvy9Roe0N3pJq/kdkx/zJ3ieBUWazR5HYFtBeixcELHa
|
||||||
|
EIH9vZCqGi4NfatsZGYyGofVQhV9TnV5yyAruBDsdb0JIYIz1rznzW4megwR6O0K
|
||||||
|
I7W4cTtzT6XdUZQGyWHZO+FSEQ3zoFxLPWeVWj4MJSj0qvd/G2Z07tAw8MJueGRK
|
||||||
|
DsQJOwhrZVH79zuQE+zF+TR3K7A9O3pq0C618gXzXYnVIRoxYqAGo5RyPGlswxRS
|
||||||
|
3sAf8spYQ3iSvo2whpRJ1c7ytmGKPbA3+4YXmJLemCCxiJK3yNJya6xg/hVTeJlL
|
||||||
|
w76+IPGx+VwfURNt+JDf3Xfq2pnppDjW59a69FMraiWCovSl6cUGHr9X+mgQ+C39
|
||||||
|
7OUhKLHQP4s+Yeak5St8/MKKG57bC1k4j07sh9pCXiAPPST3gVhSDmAGPGUDdISd
|
||||||
|
ieOw/ofwzATB+qgEyhPWcRcR/RU0fYwz2q0AxdI6brLgxgGPO6gvQiulY0+MKtuZ
|
||||||
|
kYUnBJiOQ4PBqnPDhtUjGvjT8YAbadWj+pH44bCGd7Q0Kg==
|
||||||
|
=KCiQ
|
||||||
|
-----END PGP MESSAGE-----
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
U2FsdGVkX1+v6L4gc+CbYCZyo/UVN7QfmEntIBpk+GAHGf3d7m/4hfcYd39Eh2td
|
|
||||||
lXSmNdt1cdFw/UfZ1x1OlGm/fqLh/j/rWPgEc6BwEcDFDEXpTucTjUHNDonYNH8j
|
|
||||||
eDWeAGokfguqgQG16CBLHdeyocP0kTPJSrIKQgG1Mzzck/kfB1Z6Ggv4z5KEx2dy
|
|
||||||
2rrnm+BeFT1yITwoxa3iJeudcSQznNIqQa+Mx4fUsPV+yorahp4gs0PVVj9POnAT
|
|
||||||
yRhpQgkaq5oZNVcYrWS5+6mmhbzL5jIAa4wfzVep/69RcfBkV5Oj5JJGaQzH0T74
|
|
||||||
wg8dWz/scdi2kkCn0KroJPrsG/lAsFYhbX4kUJQeRUX1pWr/iwD0i8LRx+f2C82Y
|
|
||||||
HgpsnG6c5nPRy68TltgRgCRAIJj87rR/fATVowcpChfe9sXCwfLEZ5Q2hDK8eAPW
|
|
||||||
VS87axMkProyHJZe1GK0v9CAVWpXlxv6eAr8u2SftGA87Xu3ebQ4SjReXIcAb7M6
|
|
||||||
08UnxW4YcfH+usgU2GUuNlzRctAq334AfBWYQO51l/ELJAzaDi6Ht4Czr6R7Bsfh
|
|
||||||
M3ZcjcgqY7j7ywDFmKq/a8Q0Dsjm2sezNtrrRWusomgSKFEf8WncOdkcWOAiza4T
|
|
||||||
+Qubfr1SuZuWFF+migGtYM3X8YS+VpmMRIpJ1otibMELgjvldWGqHIK1uIThLq7F
|
|
||||||
MvQ0Nog6UNg79/8vrUoEUPPB5fQsXcNC5zcpVMrpJcGogBHhsXk1EPFcB75sx/65
|
|
||||||
bl2BZlCBacH9MNIBPh17dMC46EV1FNaLiO1N3/qJkxrkiG5wBDjDlnyMn/mYc/o1
|
|
||||||
olNuIO0nnn2x8ZU02lRo8RqcqOywseZeBhAzOj+899n5Qa/0YQAnb0Y9WAxqLft/
|
|
||||||
0C45HcK5Kgd3C6wqvVUqcQ/UMxQzv0y1cM8gbfpGjUvJ6gUj7vkW08D55A6gV8Lf
|
|
||||||
SrneWAP/1B1mmV16vHaXwoYpTpQwM7i7fHWBOpH7nq6E+0P3LHyon43dYo4P6KM7
|
|
||||||
He3R6phTFp36WI4ZCUQafTDZS196Ol2ZyEAonVwSOIEIyptXeoAmleolXC/eL84Z
|
|
||||||
bEbhld8g+ulrVSrBXFpCY3jBsqPVBYEpZaGYgevsrHPSbwWa/qQkTKnOO4+oz6Pe
|
|
||||||
9iJ1yJbSWfg6Gkr6iqE41Dp4VGXtwTDHHb9YMd56iWHAkxZLFIWdYUr8XfQS6j70
|
|
||||||
j5kV3jV/w5EHGYruBdtxAWc7YKq3pfqvh9R7dD/8JOFZhA140+zmOCWG4qdDhv+5
|
|
||||||
F9vlawudssa9ZHGi1jBFPCNW13LBhUdyCY3apKF4HHeeuA465uzxIqwtkJSigdun
|
|
||||||
vC9ooYZrJjYOnJSTJnKH0WSD0pPC6CIkge+Fxuksq6cst5Zcysw1xz5zs7UNeAP+
|
|
||||||
kLs1+8Kn2d1hJuzSWdWlj7xGratLEdA6pqcfBKvMYtY0kpPPDrxm+F1FZ7LyV+dc
|
|
||||||
G1vfI6aS2azrFrBNXSeOArJ/erGHIGhWxFY0c3bcGOjXwsLWRjQ03Kdj9ffj6UFL
|
|
||||||
4JJaI0I01RilAo+woaZhNmOHl1VxSsU1lDGF7IvW3t0qKLaSg/Rv3pQqdKyjq8I5
|
|
||||||
IxPlUEMdo1EDZZx4qLmYBM1tWhgMbn4nx6P2BS7obnPdaf3B0RPxI68Z49RYZKvR
|
|
||||||
/wTyr7oWCCRQDwCuVH8t/jUrSWspzEK7ApXHdh7T9JlNurFW7oxc8ylooQrAn3Gn
|
|
||||||
mru7X3cUeVtiosAklZ7w+JNxm44IRmDKNVDeAaat+q35EA8MRFGiuXEOeNw54tWH
|
|
||||||
zNkUyUJ79Ie7BkGrZFUFqkvfY3Q/xLaBGYDQe65S8/rerybL0YI7RmMiz4x7yq8L
|
|
||||||
GoIDwPsn0z/AFefoGTi0tAXZeC+EA62okK1kKR9qrh9gmD59uiMbFX1BHe3rWhgP
|
|
||||||
cCPScYeameXV3K6wwQpX8JTdptqMAH5cpEVoUZ/PZZpkaiCuWcMODVbqTpm4SRPt
|
|
||||||
Q9s5+6/g0TUUqz7Fwi0dlfnMZVuK0a1Uf/SBYR7f/UYVLfF5juTZ+IRJwQWwp6QX
|
|
||||||
CzfYms0W34/srtM72mQOpKTd0o3xuFyVbQtZPOpNghIjArQqwt34nEzXPYHqasDx
|
|
||||||
c/yIPdW+B/YVcFPdRV16Izqmjdlupv6pPjY/T6GdHczQsH9gD28HN9+Ka2Cvficf
|
|
||||||
evO7IXe0RuvodQ3tB4LmeWoJB10G7Sko2EEfpFTDXke9Ak/5cGrpdPMtbXCAIm1o
|
|
||||||
B5UhrqNuUYSWdo0mGttbSjFR7pyLujsxLNnp8teBi33QOUhrSId5+mOvtFDGiZKa
|
|
||||||
QCC+W+BIh6IFIwnxH4dDxjz3M65NXzqNV+6mXEFU77cX+oTF4BRe0R/L4nPoaBAN
|
|
||||||
smRxtqBItpVFUdsOVb6bXg==
|
|
||||||
1
migration/.gitignore
vendored
1
migration/.gitignore
vendored
@@ -1 +0,0 @@
|
|||||||
.venv
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
import psycopg2
|
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
try:
|
|
||||||
srcConn = psycopg2.connect(database="level_monitoring_berresheim")
|
|
||||||
srcConn.autocommit = False
|
|
||||||
destConn = psycopg2.connect(database="udi-berresheim")
|
|
||||||
destConn.autocommit = False
|
|
||||||
|
|
||||||
with srcConn.cursor() as srcCur, destConn.cursor() as destCur:
|
|
||||||
srcCur.execute("select time, application_name, raw_level, level, status, battery from measurement_t")
|
|
||||||
for srcObj in srcCur:
|
|
||||||
timestamp = srcObj[0]
|
|
||||||
deviceName = srcObj[1]
|
|
||||||
rawLevel = srcObj[2]
|
|
||||||
level = srcObj[3]
|
|
||||||
status = srcObj[4]
|
|
||||||
battery = srcObj[5]
|
|
||||||
|
|
||||||
logger.info(f"{timestamp=}, {deviceName=}, {rawLevel=}, {level=}, {status=}, {battery=}")
|
|
||||||
|
|
||||||
destTime = timestamp
|
|
||||||
destApplication = "de-hottis-level-monitoring"
|
|
||||||
destDevice = "eui-a84041a2c18341d6"
|
|
||||||
destAttributes = '{"ApplicationId":"de-hottis-level-monitoring", "DeviceType":"dragino-ldds75", "Status":"' + status + '","Hint": "Migrated"}'
|
|
||||||
destValues = '{"Battery":{"unit":"V","label":"Battery","value":' + str(battery) + ',"variable":"Voltage"}, "Distance":{"unit":"mm","label":"Distance","variable":"Level","value":' + str(rawLevel) + '}, "CorrectedDistance":{"unit":"mm", "label":"CorrectedDistance", "variable":"Level","value":' + str(level) + '}}'
|
|
||||||
logger.info(f"{destTime=}, {destApplication=}, {destDevice=}, {destAttributes=}, {destValues=}")
|
|
||||||
|
|
||||||
destCur.execute("insert into measurements (time, application, device, attributes, values) values(%s, %s, %s, %s, %s)",
|
|
||||||
(destTime, destApplication, destDevice, destAttributes, destValues))
|
|
||||||
destConn.commit()
|
|
||||||
finally:
|
|
||||||
if srcConn:
|
|
||||||
srcConn.close()
|
|
||||||
if destConn:
|
|
||||||
destConn.close()
|
|
||||||
|
|
||||||
|
|
||||||
@@ -1,79 +0,0 @@
|
|||||||
import psycopg2
|
|
||||||
from loguru import logger
|
|
||||||
import os
|
|
||||||
|
|
||||||
srcPgHost = os.environ["SRC_PGHOST"]
|
|
||||||
srcPgUser = os.environ["SRC_PGUSER"]
|
|
||||||
srcPgPassword = os.environ["SRC_PGPASSWORD"]
|
|
||||||
srcPgDatabase = os.environ["SRC_PGDATABASE"]
|
|
||||||
destPgHost = os.environ["DEST_PGHOST"]
|
|
||||||
destPgUser = os.environ["DEST_PGUSER"]
|
|
||||||
destPgPassword = os.environ["DEST_PGPASSWORD"]
|
|
||||||
destPgDatabase = os.environ["DEST_PGDATABASE"]
|
|
||||||
|
|
||||||
try:
|
|
||||||
srcConn = psycopg2.connect(
|
|
||||||
host=srcPgHost,
|
|
||||||
dbname=srcPgDatabase,
|
|
||||||
user=srcPgUser,
|
|
||||||
password=srcPgPassword,
|
|
||||||
sslmode='require'
|
|
||||||
)
|
|
||||||
srcConn.autocommit = False
|
|
||||||
|
|
||||||
destConn = psycopg2.connect(
|
|
||||||
host=destPgHost,
|
|
||||||
dbname=destPgDatabase,
|
|
||||||
user=destPgUser,
|
|
||||||
password=destPgPassword,
|
|
||||||
sslmode='require'
|
|
||||||
)
|
|
||||||
destConn.autocommit = False
|
|
||||||
|
|
||||||
with srcConn.cursor() as srcCur, destConn.cursor() as destCur:
|
|
||||||
srcCur.execute("select time, deviceid, status, state, importenergyactive, importenergyreactive, exportenergyactive, exportenergyreactive, powerapparent, poweractive, powerreactive, powerdemandpositive, powerdemandreverse, factor, angle, voltage, current, powerdemand from pv_power_measurement_t order by time")
|
|
||||||
for srcObj in srcCur:
|
|
||||||
timestamp = srcObj[0]
|
|
||||||
deviceName = srcObj[1]
|
|
||||||
status = srcObj[2]
|
|
||||||
state = srcObj[3]
|
|
||||||
importenergyactive = srcObj[4]
|
|
||||||
importenergyreactive = srcObj[5]
|
|
||||||
exportenergyactive = srcObj[6]
|
|
||||||
exportenergyreactive = srcObj[7]
|
|
||||||
powerapparent = srcObj[8]
|
|
||||||
poweractive = srcObj[9]
|
|
||||||
powerreactive = srcObj[10]
|
|
||||||
powerdemandpositive = srcObj[11]
|
|
||||||
powerdemandreverse = srcObj[12]
|
|
||||||
factor = srcObj[13]
|
|
||||||
angle = srcObj[14]
|
|
||||||
voltage = srcObj[15]
|
|
||||||
current = srcObj[16]
|
|
||||||
powerdemand = srcObj[17]
|
|
||||||
|
|
||||||
|
|
||||||
logger.info(f"{timestamp=}, {deviceName=}")
|
|
||||||
|
|
||||||
destTime = timestamp
|
|
||||||
destApplication = "PV"
|
|
||||||
destDevice = "Powermeter"
|
|
||||||
destAttributes = f"{{\"ApplicationId\":\"PV\", \"Status\":\"{status}\",\"Hint\": \"Migrated\"}}"
|
|
||||||
destValues = f"{{\"Cnt\": {{\"unit\": \"\", \"label\": \"\", \"value\": \"-1\", \"variable\": \"Cnt\"}}, \"Angle\": {{\"unit\": \"degree\", \"label\": \"\", \"value\": \"{angle}\", \"variable\": \"Angle\"}}, \"State\": {{\"unit\": \"\", \"label\": \"\", \"value\": \"{state}\", \"variable\": \"State\"}}, \"Factor\": {{\"unit\": \"\", \"label\": \"\", \"value\": \"{factor}\", \"variable\": \"Factor\"}}, \"Current\": {{\"unit\": \"A\", \"label\": \"\", \"value\": \"{current}\", \"variable\": \"Current\"}}, \"Voltage\": {{\"unit\": \"V\", \"label\": \"\", \"value\": \"{voltage}\", \"variable\": \"Voltage\"}}, \"PowerActive\": {{\"unit\": \"W\", \"label\": \"\", \"value\": \"{poweractive}\", \"variable\": \"PowerActive\"}}, \"PowerApparent\": {{\"unit\": \"VA\", \"label\": \"\", \"value\": \"{powerapparent}\", \"variable\": \"PowerApparent\"}}, \"PowerReactive\": {{\"unit\": \"VA\", \"label\": \"\", \"value\": \"{powerreactive}\", \"variable\": \"PowerReactive\"}}, \"ExportEnergyActive\": {{\"unit\": \"Wh\", \"label\": \"\", \"value\": \"{exportenergyactive}\", \"variable\": \"ExportEnergyActive\"}}, \"ImportEnergyActive\": {{\"unit\": \"Wh\", \"label\": \"\", \"value\": \"{importenergyactive}\", \"variable\": \"ImportEnergyActive\"}}, \"PowerDemandReverse\": {{\"unit\": \"W\", \"label\": \"\", \"value\": \"{powerdemandreverse}\", \"variable\": \"PowerDemandReverse\"}}, \"PowerDemandPositive\": {{\"unit\": \"W\", \"label\": \"\", \"value\": \"{powerdemandpositive}\", \"variable\": \"PowerDemandPositive\"}}, \"ExportEnergyReactive\": {{\"unit\": \"VAh\", \"label\": \"\", \"value\": \"{exportenergyreactive}\", \"variable\": \"ExportEnergyReactive\"}}, \"ImportEnergyReactive\": {{\"unit\": \"VAh\", \"label\": \"\", \"value\": \"{importenergyreactive}\", \"variable\": \"ImportEnergyReactive\"}}}}"
|
|
||||||
logger.info(f"{destTime=}, {destApplication=}, {destDevice=}, {destAttributes=}, {destValues=}")
|
|
||||||
|
|
||||||
|
|
||||||
try:
|
|
||||||
destCur.execute("insert into measurements (time, application, device, attributes, values) values(%s, %s, %s, %s, %s)",
|
|
||||||
(destTime, destApplication, destDevice, destAttributes, destValues))
|
|
||||||
destConn.commit()
|
|
||||||
except Exception as e:
|
|
||||||
destConn.rollback()
|
|
||||||
logger.error(f"Error {e} when inserted time {destTime}")
|
|
||||||
finally:
|
|
||||||
if srcConn:
|
|
||||||
srcConn.close()
|
|
||||||
if destConn:
|
|
||||||
destConn.close()
|
|
||||||
|
|
||||||
|
|
||||||
@@ -1,78 +0,0 @@
|
|||||||
import psycopg2
|
|
||||||
from loguru import logger
|
|
||||||
import os
|
|
||||||
|
|
||||||
srcPgHost = os.environ["SRC_PGHOST"]
|
|
||||||
srcPgUser = os.environ["SRC_PGUSER"]
|
|
||||||
srcPgPassword = os.environ["SRC_PGPASSWORD"]
|
|
||||||
srcPgDatabase = os.environ["SRC_PGDATABASE"]
|
|
||||||
destPgHost = os.environ["DEST_PGHOST"]
|
|
||||||
destPgUser = os.environ["DEST_PGUSER"]
|
|
||||||
destPgPassword = os.environ["DEST_PGPASSWORD"]
|
|
||||||
destPgDatabase = os.environ["DEST_PGDATABASE"]
|
|
||||||
|
|
||||||
try:
|
|
||||||
srcConn = psycopg2.connect(
|
|
||||||
host=srcPgHost,
|
|
||||||
dbname=srcPgDatabase,
|
|
||||||
user=srcPgUser,
|
|
||||||
password=srcPgPassword,
|
|
||||||
sslmode='require'
|
|
||||||
)
|
|
||||||
srcConn.autocommit = False
|
|
||||||
|
|
||||||
destConn = psycopg2.connect(
|
|
||||||
host=destPgHost,
|
|
||||||
dbname=destPgDatabase,
|
|
||||||
user=destPgUser,
|
|
||||||
password=destPgPassword,
|
|
||||||
sslmode='require'
|
|
||||||
)
|
|
||||||
destConn.autocommit = False
|
|
||||||
|
|
||||||
with srcConn.cursor() as srcCur, destConn.cursor() as destCur:
|
|
||||||
srcCur.execute("select time, location, status, temperature, category from room_climate_measurement_t where category = 'heating' and time > '2023-12-19 05:20:00' order by time")
|
|
||||||
for srcObj in srcCur:
|
|
||||||
timestamp = srcObj[0]
|
|
||||||
location = srcObj[1]
|
|
||||||
status = srcObj[2]
|
|
||||||
temperature = srcObj[3]
|
|
||||||
category = srcObj[4]
|
|
||||||
|
|
||||||
logger.info(f"{timestamp=}, {location=}, {status=}, {temperature=}, {category=}")
|
|
||||||
|
|
||||||
destTime = timestamp
|
|
||||||
|
|
||||||
match category:
|
|
||||||
case 'heating':
|
|
||||||
destApplication = 'Temperature Heating'
|
|
||||||
case 'Outdoor':
|
|
||||||
destApplication = 'Temperature Wago'
|
|
||||||
case 'Device':
|
|
||||||
destApplication = 'Temperature Wago'
|
|
||||||
case 'Indoor':
|
|
||||||
destApplication = 'Temperature Multisensor' if location != 'Anna-Koeln-2' else 'Temperature Shelly Plus HT'
|
|
||||||
case 'Special':
|
|
||||||
destApplication = 'Temperature Multisensor'
|
|
||||||
|
|
||||||
destDevice = location
|
|
||||||
destAttributes = '{"ApplicationId":"temperature-imported", "Status":"' + status + '","Location":"' + location + '","Category":"' + category + '","Hint": "Migrated"}'
|
|
||||||
destValues = '{"Value": {"unit": "°C", "label": "", "value": "' + str(temperature) + '", "variable": ""}}'
|
|
||||||
|
|
||||||
logger.info(f"{destTime=}, {destApplication=}, {destDevice=}, {destAttributes=}, {destValues=}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
destCur.execute("insert into measurements (time, application, device, attributes, values) values(%s, %s, %s, %s, %s)",
|
|
||||||
(destTime, destApplication, destDevice, destAttributes, destValues))
|
|
||||||
destConn.commit()
|
|
||||||
except Exception as e:
|
|
||||||
destConn.rollback()
|
|
||||||
logger.error(f"Error {e} when inserted time {destTime}")
|
|
||||||
|
|
||||||
finally:
|
|
||||||
if srcConn:
|
|
||||||
srcConn.close()
|
|
||||||
|
|
||||||
if destConn:
|
|
||||||
destConn.close()
|
|
||||||
|
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
loguru==0.7.2
|
|
||||||
psycopg2==2.9.9
|
|
||||||
@@ -129,3 +129,54 @@ create or replace view lora_sht21_v as
|
|||||||
where m.application = 'de-hottis-app01' and
|
where m.application = 'de-hottis-app01' and
|
||||||
m.attributes->>'DeviceType' = 'hottis-gy21' and
|
m.attributes->>'DeviceType' = 'hottis-gy21' and
|
||||||
m.device = d.label;
|
m.device = d.label;
|
||||||
|
|
||||||
|
create or replace view ntp_server_snmp_v as
|
||||||
|
select time,
|
||||||
|
device,
|
||||||
|
cast(values->'load1'->>'value' as float) as laLoad1,
|
||||||
|
cast(values->'lan-in'->>'value' as int) as lanInOctetsPerSeconds,
|
||||||
|
cast(values->'lan-out'->>'value' as int) as lanOutOctetsPerSeconds
|
||||||
|
from measurements
|
||||||
|
where application = 'SNMP' and device = '172.16.13.10';
|
||||||
|
|
||||||
|
create or replace view ntp_server_variables_v as
|
||||||
|
select time,
|
||||||
|
device,
|
||||||
|
cast(values->'rootdisp'->>'value' as float) as rootdisp
|
||||||
|
from measurements
|
||||||
|
where application = 'TSM' and device = '172.16.13.10';
|
||||||
|
|
||||||
|
|
||||||
|
-- Status string `unit:"" json:"status"`
|
||||||
|
-- Timestamp string `unit:"" json:"timestamp"`
|
||||||
|
-- VoltageL1 float32 `unit:"V" json:"voltageL1"`
|
||||||
|
-- VoltageL2 float32 `unit:"V" json:"voltageL2"`
|
||||||
|
-- VoltageL3 float32 `unit:"V" json:"voltageL3"`
|
||||||
|
-- CurrentL1 float32 `unit:"A" json:"currentL1"`
|
||||||
|
-- CurrentL2 float32 `unit:"A" json:"currentL2"`
|
||||||
|
-- CurrentL3 float32 `unit:"A" json:"currentL3"`
|
||||||
|
-- PowerL1 float32 `unit:"W" json:"powerL1"`
|
||||||
|
-- PowerL2 float32 `unit:"W" json:"powerL2"`
|
||||||
|
-- PowerL3 float32 `unit:"W" json:"powerL3"`
|
||||||
|
-- TotalImportEnergy float32 `unit:"Wh" json:"totalImportEnergy"`
|
||||||
|
-- TotalExportEnergy float32 `unit:"Wh" json:"totalExportEnergy"`
|
||||||
|
-- Cnt int `unit:"" json:"cnt"`
|
||||||
|
|
||||||
|
|
||||||
|
create or replace view car_values_v as
|
||||||
|
select time,
|
||||||
|
cast(values->'VoltageL1'->>'value' as float) as voltage_l1,
|
||||||
|
cast(values->'VoltageL2'->>'value' as float) as voltage_l2,
|
||||||
|
cast(values->'VoltageL3'->>'value' as float) as voltage_l3,
|
||||||
|
cast(values->'CurrentL1'->>'value' as float) as current_l1,
|
||||||
|
cast(values->'CurrentL2'->>'value' as float) as current_l2,
|
||||||
|
cast(values->'CurrentL3'->>'value' as float) as current_l3,
|
||||||
|
cast(values->'PowerL1'->>'value' as float) as power_l1,
|
||||||
|
cast(values->'PowerL2'->>'value' as float) as power_l2,
|
||||||
|
cast(values->'PowerL3'->>'value' as float) as power_l3,
|
||||||
|
cast(values->'TotalImportEnergy'->>'value' as float) as total_import_energy,
|
||||||
|
cast(values->'TotalExportEnergy'->>'value' as float) as total_export_energy,
|
||||||
|
values->'Status'->>'value' as status,
|
||||||
|
device
|
||||||
|
from measurements
|
||||||
|
where application = 'Car';
|
||||||
@@ -41,3 +41,4 @@ create or replace view cubecell_threeway_battery_v as
|
|||||||
from measurements
|
from measurements
|
||||||
where application = 'de-hottis-saerbeck-monitoring' and
|
where application = 'de-hottis-saerbeck-monitoring' and
|
||||||
device = 'eui-70b3d57ed0068fa4';
|
device = 'eui-70b3d57ed0068fa4';
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ type VariableType struct {
|
|||||||
Variable string `json:"variable"`
|
Variable string `json:"variable"`
|
||||||
Unit string `json:"unit"`
|
Unit string `json:"unit"`
|
||||||
Value interface{} `json:"value,omitempty"`
|
Value interface{} `json:"value,omitempty"`
|
||||||
|
Status string `json:"status,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Measurement struct {
|
type Measurement struct {
|
||||||
|
|||||||
@@ -1,149 +1,153 @@
|
|||||||
package dispatcher
|
package dispatcher
|
||||||
|
|
||||||
import "log"
|
import (
|
||||||
import "time"
|
"fmt"
|
||||||
import "os"
|
"log"
|
||||||
import "fmt"
|
"net/url"
|
||||||
import "net/url"
|
"os"
|
||||||
import "udi/mqtt"
|
"time"
|
||||||
import "udi/config"
|
"udi/config"
|
||||||
import "udi/counter"
|
"udi/counter"
|
||||||
import "udi/handlers/handler"
|
"udi/handlers/car"
|
||||||
import "udi/handlers/ttn"
|
"udi/handlers/dt1t"
|
||||||
import "udi/handlers/iot"
|
"udi/handlers/handler"
|
||||||
import "udi/handlers/pv"
|
"udi/handlers/iot"
|
||||||
import "udi/handlers/mbgw3"
|
"udi/handlers/locative"
|
||||||
import "udi/handlers/sver"
|
"udi/handlers/mbgw3"
|
||||||
import "udi/handlers/svej"
|
"udi/handlers/prepared"
|
||||||
import "udi/handlers/dt1t"
|
"udi/handlers/pv"
|
||||||
import "udi/handlers/locative"
|
"udi/handlers/svej"
|
||||||
import "udi/handlers/snmp"
|
"udi/handlers/sver"
|
||||||
import "udi/handlers/z2m"
|
"udi/handlers/ttn"
|
||||||
|
"udi/handlers/z2m"
|
||||||
|
"udi/mqtt"
|
||||||
|
)
|
||||||
|
|
||||||
var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
|
var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
|
||||||
var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100)
|
var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100)
|
||||||
|
|
||||||
func InitDispatcher() {
|
func InitDispatcher() {
|
||||||
log.Printf("Dispatcher initializing")
|
log.Printf("Dispatcher initializing")
|
||||||
go archiver()
|
go archiver()
|
||||||
|
|
||||||
for _, mapping := range config.Config.TopicMappings {
|
for _, mapping := range config.Config.TopicMappings {
|
||||||
// log.Printf("Trying to initialize %s", mapping)
|
// log.Printf("Trying to initialize %s", mapping)
|
||||||
|
|
||||||
var factory interface{}
|
var factory interface{}
|
||||||
switch mapping.Handler {
|
switch mapping.Handler {
|
||||||
case "TTN":
|
case "TTN":
|
||||||
factory = ttn.New
|
factory = ttn.New
|
||||||
case "IoT":
|
case "IoT":
|
||||||
factory = iot.New
|
factory = iot.New
|
||||||
case "PV":
|
case "PV":
|
||||||
factory = pv.New
|
factory = pv.New
|
||||||
case "MBGW3":
|
case "MBGW3":
|
||||||
factory = mbgw3.New
|
factory = mbgw3.New
|
||||||
case "SVER":
|
case "SVER":
|
||||||
factory = sver.New
|
factory = sver.New
|
||||||
case "SVEJ":
|
case "SVEJ":
|
||||||
factory = svej.New
|
factory = svej.New
|
||||||
case "DT1T":
|
case "DT1T":
|
||||||
factory = dt1t.New
|
factory = dt1t.New
|
||||||
case "Locative":
|
case "Locative":
|
||||||
factory = locative.New
|
factory = locative.New
|
||||||
case "SNMP":
|
case "PREP":
|
||||||
factory = snmp.New
|
factory = prepared.New
|
||||||
case "Z2M":
|
case "Z2M":
|
||||||
factory = z2m.New
|
factory = z2m.New
|
||||||
default:
|
case "Car":
|
||||||
factory = nil
|
factory = car.New
|
||||||
log.Printf("No handler %s found, ignore mapping", mapping.Handler)
|
default:
|
||||||
}
|
factory = nil
|
||||||
|
log.Printf("No handler %s found, ignore mapping", mapping.Handler)
|
||||||
|
}
|
||||||
|
|
||||||
fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler)
|
fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler)
|
||||||
if ! ok {
|
if !ok {
|
||||||
log.Println("Typ Assertion failed")
|
log.Println("Typ Assertion failed")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
handler := fn(mapping.Id, mapping.Config)
|
handler := fn(mapping.Id, mapping.Config)
|
||||||
handlerMap[mapping.Id] = handler
|
handlerMap[mapping.Id] = handler
|
||||||
}
|
}
|
||||||
|
|
||||||
//log.Printf("handlerMap: %s", handlerMap)
|
//log.Printf("handlerMap: %s", handlerMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
func storeMessage(filename string, item handler.MessageT) {
|
func storeMessage(filename string, item handler.MessageT) {
|
||||||
file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644)
|
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
|
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
|
||||||
counter.F("Archived")
|
counter.F("Archived")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload)
|
archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload)
|
||||||
_, err = file.WriteString(string(archivingString) + "\n")
|
_, err = file.WriteString(string(archivingString) + "\n")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Unable to write message, message is not archived: %s", err)
|
log.Printf("Unable to write message, message is not archived: %s", err)
|
||||||
counter.F("Archived")
|
counter.F("Archived")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
//log.Println("Successfully archived message")
|
//log.Println("Successfully archived message")
|
||||||
counter.S("Archived")
|
counter.S("Archived")
|
||||||
}
|
}
|
||||||
|
|
||||||
func archiver() {
|
func archiver() {
|
||||||
archivingRootDir := config.Config.Archiver.Dir
|
archivingRootDir := config.Config.Archiver.Dir
|
||||||
var lastArchivingDir string
|
var lastArchivingDir string
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case message := <- archiverChannel:
|
case message := <-archiverChannel:
|
||||||
currentDateStr := message.Timestamp.Format("2006/01/02/15")
|
currentDateStr := message.Timestamp.Format("2006/01/02/15")
|
||||||
currentArchivingDir := archivingRootDir + "/" + currentDateStr
|
currentArchivingDir := archivingRootDir + "/" + currentDateStr
|
||||||
if currentArchivingDir != lastArchivingDir {
|
if currentArchivingDir != lastArchivingDir {
|
||||||
err := os.MkdirAll(currentArchivingDir, 0755)
|
err := os.MkdirAll(currentArchivingDir, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
|
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
|
||||||
counter.F("Archived")
|
counter.F("Archived")
|
||||||
}
|
}
|
||||||
lastArchivingDir = currentArchivingDir
|
lastArchivingDir = currentArchivingDir
|
||||||
//log.Printf("Archiving dir %s created", currentArchivingDir)
|
//log.Printf("Archiving dir %s created", currentArchivingDir)
|
||||||
}
|
}
|
||||||
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
|
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
|
||||||
storeMessage(archivingFilename, message)
|
storeMessage(archivingFilename, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func InputDispatcher() {
|
func InputDispatcher() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case mqttMessage := <- mqtt.InputChannel:
|
case mqttMessage := <-mqtt.InputChannel:
|
||||||
//log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
|
//log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
|
||||||
message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) }
|
message := handler.MessageT{time.Now(), mqttMessage.Topic, string(mqttMessage.Payload)}
|
||||||
archiverChannel <- message
|
archiverChannel <- message
|
||||||
handleMessage(message)
|
handleMessage(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleMessage(message handler.MessageT) {
|
func handleMessage(message handler.MessageT) {
|
||||||
for _, mapping := range config.Config.TopicMappings {
|
for _, mapping := range config.Config.TopicMappings {
|
||||||
// log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler)
|
// log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler)
|
||||||
for _, subscribedTopic := range mapping.Topics {
|
for _, subscribedTopic := range mapping.Topics {
|
||||||
// log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
|
// log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
|
||||||
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
|
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
|
||||||
//log.Printf("Handle message in handler %s", mapping.Id)
|
//log.Printf("Handle message in handler %s", mapping.Id)
|
||||||
handler, exists := handlerMap[mapping.Id]
|
handler, exists := handlerMap[mapping.Id]
|
||||||
if exists {
|
if exists {
|
||||||
handler.Handle(message)
|
handler.Handle(message)
|
||||||
counter.S("Dispatched")
|
counter.S("Dispatched")
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Handler %s not found, message %s is lost", mapping.Id, message)
|
log.Printf("Handler %s not found, message %s is lost", mapping.Id, message)
|
||||||
counter.F("Dispatched")
|
counter.F("Dispatched")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("No match for topic %s, message %s is lost", message.Topic, message)
|
log.Printf("No match for topic %s, message %s is lost", message.Topic, message)
|
||||||
counter.F("Dispatched")
|
counter.F("Dispatched")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,23 +3,23 @@ module udi
|
|||||||
go 1.22.3
|
go 1.22.3
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/eclipse/paho.mqtt.golang v1.4.3
|
github.com/eclipse/paho.mqtt.golang v1.5.0
|
||||||
github.com/google/uuid v1.4.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
|
||||||
gorm.io/driver/postgres v1.5.9
|
gorm.io/driver/postgres v1.5.11
|
||||||
gorm.io/gorm v1.25.11
|
gorm.io/gorm v1.25.12
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/gorilla/websocket v1.5.0 // indirect
|
github.com/gorilla/websocket v1.5.3 // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
github.com/jackc/pgx/v5 v5.5.5 // indirect
|
github.com/jackc/pgx/v5 v5.7.2 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
github.com/jinzhu/now v1.1.5 // indirect
|
github.com/jinzhu/now v1.1.5 // indirect
|
||||||
golang.org/x/crypto v0.25.0 // indirect
|
golang.org/x/crypto v0.32.0 // indirect
|
||||||
golang.org/x/net v0.27.0 // indirect
|
golang.org/x/net v0.34.0 // indirect
|
||||||
golang.org/x/sync v0.7.0 // indirect
|
golang.org/x/sync v0.10.0 // indirect
|
||||||
golang.org/x/text v0.16.0 // indirect
|
golang.org/x/text v0.21.0 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -28,14 +28,14 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
|
|||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
|
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
|
||||||
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
|
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
|
||||||
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
|
||||||
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
|
||||||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
||||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
|||||||
94
src/udi/handlers/car/car.go
Normal file
94
src/udi/handlers/car/car.go
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
package car
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
"udi/config"
|
||||||
|
"udi/database"
|
||||||
|
"udi/handlers/handler"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CarHandler struct {
|
||||||
|
handler.CommonHandler
|
||||||
|
dbh *database.DatabaseHandle
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
"status": "Ok",
|
||||||
|
"timestamp": "2025-12-15T13:11:15.648243",
|
||||||
|
"voltageL1": 228.68,
|
||||||
|
"voltageL2": 227.69,
|
||||||
|
"voltageL3": 228.53,
|
||||||
|
"currentL1": 0.0,
|
||||||
|
"currentL2": 0.0,
|
||||||
|
"currentL3": 0.0,
|
||||||
|
"powerL1": 0.0,
|
||||||
|
"powerL2": 0.0,
|
||||||
|
"powerL3": 0.0,
|
||||||
|
"totalImportEnergy": 0.0,
|
||||||
|
"totalExportEnergy": 0.0,
|
||||||
|
"cnt": 399300}
|
||||||
|
*/
|
||||||
|
|
||||||
|
type CarValue struct {
|
||||||
|
Status string `unit:"" json:"status"`
|
||||||
|
Timestamp string `unit:"" json:"timestamp"`
|
||||||
|
VoltageL1 float32 `unit:"V" json:"voltageL1"`
|
||||||
|
VoltageL2 float32 `unit:"V" json:"voltageL2"`
|
||||||
|
VoltageL3 float32 `unit:"V" json:"voltageL3"`
|
||||||
|
CurrentL1 float32 `unit:"A" json:"currentL1"`
|
||||||
|
CurrentL2 float32 `unit:"A" json:"currentL2"`
|
||||||
|
CurrentL3 float32 `unit:"A" json:"currentL3"`
|
||||||
|
PowerL1 float32 `unit:"W" json:"powerL1"`
|
||||||
|
PowerL2 float32 `unit:"W" json:"powerL2"`
|
||||||
|
PowerL3 float32 `unit:"W" json:"powerL3"`
|
||||||
|
TotalImportEnergy float32 `unit:"Wh" json:"totalImportEnergy"`
|
||||||
|
TotalExportEnergy float32 `unit:"Wh" json:"totalExportEnergy"`
|
||||||
|
Cnt int `unit:"" json:"cnt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(id string, config config.HandlerConfigT) handler.Handler {
|
||||||
|
t := &CarHandler{}
|
||||||
|
t.Id = id
|
||||||
|
t.dbh = database.NewDatabaseHandle()
|
||||||
|
log.Printf("Handler Car %d initialized", id)
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *CarHandler) Handle(message handler.MessageT) {
|
||||||
|
//log.Printf("Handler Car %d processing %s -> %s", self.id, message.Topic, message.Payload)
|
||||||
|
|
||||||
|
var carValue CarValue
|
||||||
|
err := json.Unmarshal([]byte(message.Payload), &carValue)
|
||||||
|
if err != nil {
|
||||||
|
self.Lost("Unable to parse payload into carValue struct", err, message)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
variables := make(map[string]database.VariableType)
|
||||||
|
carValueStructValue := reflect.ValueOf(carValue)
|
||||||
|
for i := 0; i < carValueStructValue.NumField(); i++ {
|
||||||
|
field := carValueStructValue.Type().Field(i)
|
||||||
|
fieldValue := carValueStructValue.Field(i)
|
||||||
|
v := database.VariableType{
|
||||||
|
Label: "",
|
||||||
|
Variable: field.Name,
|
||||||
|
Unit: field.Tag.Get("unit"),
|
||||||
|
Value: fieldValue.Interface(),
|
||||||
|
}
|
||||||
|
variables[field.Name] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
measurement := database.Measurement{
|
||||||
|
Time: time.Now(),
|
||||||
|
Application: "Car",
|
||||||
|
Device: "Powermeter",
|
||||||
|
Values: variables,
|
||||||
|
}
|
||||||
|
|
||||||
|
self.dbh.StoreMeasurement(&measurement)
|
||||||
|
self.S()
|
||||||
|
}
|
||||||
@@ -74,12 +74,22 @@ func (self *Mbgw3Handler) Handle(message handler.MessageT) {
|
|||||||
|
|
||||||
measurement.Values = make(map[string]database.VariableType)
|
measurement.Values = make(map[string]database.VariableType)
|
||||||
unitMap := map[string]string { "Energy": "Wh", "Power": "W", "Voltage": "V", "Current": "A", "Volume": "m3" }
|
unitMap := map[string]string { "Energy": "Wh", "Power": "W", "Voltage": "V", "Current": "A", "Volume": "m3" }
|
||||||
|
keyCount := make(map[string]int)
|
||||||
|
|
||||||
for k, v := range observation.Values {
|
for k, v := range observation.Values {
|
||||||
unit, exists := unitMap[k]
|
unit, exists := unitMap[k]
|
||||||
if ! exists {
|
if ! exists {
|
||||||
unit = "Unmapped Unit"
|
unit = "Unmapped Unit"
|
||||||
}
|
}
|
||||||
measurement.Values[k] = database.VariableType {
|
|
||||||
|
// Check if key already exists and create unique key if needed
|
||||||
|
keyCount[k]++
|
||||||
|
uniqueKey := k
|
||||||
|
if keyCount[k] > 1 {
|
||||||
|
uniqueKey = k + strconv.Itoa(keyCount[k])
|
||||||
|
}
|
||||||
|
|
||||||
|
measurement.Values[uniqueKey] = database.VariableType {
|
||||||
Label: "",
|
Label: "",
|
||||||
Variable: k,
|
Variable: k,
|
||||||
Unit: unit,
|
Unit: unit,
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package snmp
|
package prepared
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
type SnmpHandler struct {
|
type PreparedHandler struct {
|
||||||
handler.CommonHandler
|
handler.CommonHandler
|
||||||
dbh *database.DatabaseHandle
|
dbh *database.DatabaseHandle
|
||||||
}
|
}
|
||||||
@@ -19,6 +19,8 @@ type endpoint_t struct {
|
|||||||
Label string `json:"label"`
|
Label string `json:"label"`
|
||||||
Variable string `json:"variable"`
|
Variable string `json:"variable"`
|
||||||
Value string `json:"value"`
|
Value string `json:"value"`
|
||||||
|
Unit string `json:"unit"`
|
||||||
|
Status string `json:"status"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type observation_t struct {
|
type observation_t struct {
|
||||||
@@ -29,16 +31,16 @@ type observation_t struct {
|
|||||||
|
|
||||||
|
|
||||||
func New(id string, config config.HandlerConfigT) handler.Handler {
|
func New(id string, config config.HandlerConfigT) handler.Handler {
|
||||||
t := &SnmpHandler {
|
t := &PreparedHandler {
|
||||||
}
|
}
|
||||||
t.Id = id
|
t.Id = id
|
||||||
t.dbh = database.NewDatabaseHandle()
|
t.dbh = database.NewDatabaseHandle()
|
||||||
log.Printf("Handler SNMP %d initialized", id)
|
log.Printf("Handler Prepared %d initialized", id)
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *SnmpHandler) Handle(message handler.MessageT) {
|
func (self *PreparedHandler) Handle(message handler.MessageT) {
|
||||||
// log.Printf("Handler SNMP %d processing %s -> %s", self.Id, message.Topic, message.Payload)
|
log.Printf("Handler Prepared %d processing %s -> %s", self.Id, message.Topic, message.Payload)
|
||||||
|
|
||||||
var observation observation_t
|
var observation observation_t
|
||||||
err := json.Unmarshal([]byte(message.Payload), &observation)
|
err := json.Unmarshal([]byte(message.Payload), &observation)
|
||||||
@@ -50,7 +52,7 @@ func (self *SnmpHandler) Handle(message handler.MessageT) {
|
|||||||
var measurement database.Measurement
|
var measurement database.Measurement
|
||||||
measurement.Time = time.Now()
|
measurement.Time = time.Now()
|
||||||
|
|
||||||
measurement.Application = "SNMP"
|
measurement.Application = self.Id
|
||||||
measurement.Device = observation.Device
|
measurement.Device = observation.Device
|
||||||
|
|
||||||
measurement.Attributes = map[string]interface{} {
|
measurement.Attributes = map[string]interface{} {
|
||||||
@@ -62,12 +64,13 @@ func (self *SnmpHandler) Handle(message handler.MessageT) {
|
|||||||
measurement.Values[k] = database.VariableType {
|
measurement.Values[k] = database.VariableType {
|
||||||
Label: v.Label,
|
Label: v.Label,
|
||||||
Variable: v.Variable,
|
Variable: v.Variable,
|
||||||
Unit: "",
|
Unit: v.Unit,
|
||||||
Value: v.Value,
|
Value: v.Value,
|
||||||
|
Status: v.Status,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// log.Printf("Prepared measurement item: %s", measurement)
|
log.Printf("Prepared measurement item: %s", measurement)
|
||||||
|
|
||||||
self.dbh.StoreMeasurement(&measurement)
|
self.dbh.StoreMeasurement(&measurement)
|
||||||
self.S()
|
self.S()
|
||||||
Reference in New Issue
Block a user