Compare commits

...

31 Commits

Author SHA1 Message Date
f63c22912a float fix 2
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 20:57:32 +01:00
c37420a993 float fix 1
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 20:51:50 +01:00
ac0c417a48 small j for float 3
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 20:34:21 +01:00
dc965aeba6 small j for float 2
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 20:27:28 +01:00
b940f715c0 small j for float
Some checks failed
ci/woodpecker/tag/woodpecker Pipeline failed
2026-03-06 20:25:04 +01:00
8c5626942f fix config 3
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 12:30:04 +01:00
6d0dc12ac1 fix config 2
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 11:28:28 +01:00
6a4aac4140 fix config
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-06 11:24:02 +01:00
77d23e39cf add new shellies 2
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-04 23:27:48 +01:00
e28042f3be add new shellies
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-04 23:25:59 +01:00
e1ad76f703 fix
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-04 12:10:29 +01:00
6dac149a48 influxdb url
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-04 11:21:55 +01:00
691ebdeadd prepare additional deployment
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2026-03-04 11:07:31 +01:00
6ef80f8438 z2m working again 2026-02-16 17:07:23 +01:00
47116904fc changes 2026-02-09 15:36:35 +01:00
5b46ecb0b1 fields and tags 2026-02-04 21:32:32 +01:00
a1ea1b230e communication with influxdb is working, schema of data in influxdb is unusable so far, too many spare columns 2026-02-04 17:19:12 +01:00
97679561d8 changes for influxdb 2026-02-04 14:58:13 +01:00
a78c6952f0 voltage hack
All checks were successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-12-22 18:50:59 +01:00
3b69e1e2af car_values_v
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2025-12-15 17:28:15 +01:00
2dfca8d70a dockerize and deploy only for tag
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-12-15 14:55:49 +01:00
0352b720cd car and gpg
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-12-15 14:53:49 +01:00
95984157e8 add car powermeter 2025-12-15 14:19:18 +01:00
61509c0000 queries
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2025-02-10 15:40:37 +01:00
3c09c04066 rename snmp handler to prepared handler, 4
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-02-10 14:07:25 +01:00
af739c7148 rename snmp handler to prepared handler, 3
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-02-10 14:03:22 +01:00
33ff176c79 rename snmp handler to prepared handler, 2
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-02-10 14:01:50 +01:00
134e3706cc rename snmp handler to prepared handler
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-02-10 13:58:01 +01:00
3a56309b9f tsm
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-02-10 13:11:31 +01:00
084645f002 update of go modules
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-02-04 16:34:49 +01:00
9815371199 filter for build step
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
2025-02-04 16:25:03 +01:00
54 changed files with 1181 additions and 1300 deletions

View File

@@ -1,3 +1,7 @@
when:
- event: tag
steps: steps:
build: build:
image: golang:1.22.5-alpine3.20 image: golang:1.22.5-alpine3.20
@@ -8,70 +12,28 @@ steps:
- go build -a -installsuffix nocgo -o udi main.go - go build -a -installsuffix nocgo -o udi main.go
- cp udi ../.. - cp udi ../..
scan:
image: quay.io/wollud1969/woodpecker-helper:0.5.1
environment:
TRIVY_TOKEN:
from_secret: trivy_token
TRIVY_URL:
from_secret: trivy_url
DTRACK_API_KEY:
from_secret: dtrack_api_key
DTRACK_API_URL:
from_secret: dtrack_api_url
commands:
- export GOPATH=/woodpecker/go # the export is required, otherwise trivy will not consider the variable
- HOME=/home/`id -nu`
- TAG="${CI_COMMIT_TAG:-$CI_COMMIT_SHA}"
- |
trivy fs \
--server $TRIVY_URL \
--token $TRIVY_TOKEN \
--format cyclonedx \
--scanners license \
--output /tmp/sbom.xml \
.
- cat /tmp/sbom.xml
- |
curl -X "POST" \
-H "Content-Type: multipart/form-data" \
-H "X-Api-Key: $DTRACK_API_KEY" \
-F "autoCreate=true" \
-F "projectName=$CI_REPO" \
-F "projectVersion=$TAG" \
-F "bom=@/tmp/sbom.xml"\
"$DTRACK_API_URL/api/v1/bom"
when:
- event: [push, tag]
dockerize: dockerize:
image: plugins/kaniko image: plugins/kaniko
settings: settings:
repo: ${FORGE_NAME}/${CI_REPO} repo: ${FORGE_NAME}/${CI_REPO}
registry: registry:
from_secret: container_registry from_secret: local_registry
tags: latest,${CI_COMMIT_SHA},${CI_COMMIT_TAG} tags: latest,${CI_COMMIT_TAG}
username: username:
from_secret: container_registry_username from_secret: local_username
password: password:
from_secret: container_registry_password from_secret: local_password
dockerfile: Dockerfile dockerfile: Dockerfile
when:
- event: [push, tag]
deploy: deploy:
image: quay.io/wollud1969/woodpecker-helper:0.5.1 image: quay.io/wollud1969/k8s-admin-helper:0.4.1
environment: environment:
KUBE_CONFIG_CONTENT: KUBE_CONFIG_CONTENT:
from_secret: kube_config from_secret: kube_config
ENCRYPTION_KEY: GPG_PASSPHRASE:
from_secret: encryption_key from_secret: gpg_passphrase
MD5_CHECKSUM:
from_secret: secrets_checksum
commands: commands:
- export IMAGE_TAG=$CI_COMMIT_TAG - export IMAGE_TAG=$CI_COMMIT_TAG
- printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig - printf "$KUBE_CONFIG_CONTENT" > /tmp/kubeconfig
- export KUBECONFIG=/tmp/kubeconfig - export KUBECONFIG=/tmp/kubeconfig
- ./deployment/deploy.sh - ./deployment/deploy.sh
when:
- event: tag

View File

@@ -1,44 +0,0 @@
#!/bin/bash
if [ "$ENCRYPTION_KEY" = "" ]; then
echo "ENCRYPTION_KEY not set"
exit 1
fi
if [ "$MD5_CHECKSUM" = "" ]; then
echo "No checksum given"
exit 1
fi
SECRETS_CIPHERTEXT_FILE=secrets.enc
SECRETS_PLAINTEXT_FILE=/tmp/secrets
TMP_FILE=`mktemp`
POD_NAME_SUFFIX=`date +%s`
cat $SECRETS_CIPHERTEXT_FILE | \
kubectl run openssl-$POD_NAME_SUFFIX \
--rm \
--image bitnami/debian-base-buildpack:latest \
--env KEY=$ENCRYPTION_KEY \
-i \
-q \
-- \
/bin/sh -c "openssl enc -aes-256-cbc -salt -pass env:KEY -a -d" > \
$TMP_FILE
if [ `uname` = "Darwin" ]; then
CALCULATED_CHECKSUM=`cat $TMP_FILE | md5`
elif [ `uname` = "Linux" ]; then
CALCULATED_CHECKSUM=`cat $TMP_FILE | md5sum - | awk '{print $1}'`
fi
if [ "$MD5_CHECKSUM" != "$CALCULATED_CHECKSUM" ]; then
echo "Invalid checksum"
exit 1
fi
#cat $TMP_FILE
mv $TMP_FILE $SECRETS_PLAINTEXT_FILE

View File

@@ -17,7 +17,7 @@ metadata:
labels: labels:
app: udi app: udi
annotations: annotations:
secret.reloader.stakater.com/reload: "%PRE%-udi-conf,%PRE%-udi-db-cred,%PRE%-mqtt-password" secret.reloader.stakater.com/reload: "%PRE%-udi-conf,%PRE%-udi-db-cred,%PRE%-mqtt-password,%PRE%-udi-influxdb-cred"
spec: spec:
replicas: 1 replicas: 1
selector: selector:
@@ -36,6 +36,8 @@ spec:
name: %PRE%-udi-db-cred name: %PRE%-udi-db-cred
- secretRef: - secretRef:
name: %PRE%-mqtt-password name: %PRE%-mqtt-password
- secretRef:
name: %PRE%-udi-influxdb-cred
- configMapRef: - configMapRef:
name: %PRE%-udi-conf name: %PRE%-udi-conf
volumeMounts: volumeMounts:

View File

@@ -4,6 +4,11 @@ if [ "$IMAGE_TAG" == "" ]; then
echo "Make sure IMAGE_TAG is set" echo "Make sure IMAGE_TAG is set"
exit 1 exit 1
fi fi
if [ "$GPG_PASSPHRASE" == "" ]; then
echo "Make sure GPG_PASSPHRASE is set"
exit 1
fi
IMAGE_NAME=$FORGE_NAME/$CI_REPO IMAGE_NAME=$FORGE_NAME/$CI_REPO
@@ -15,7 +20,10 @@ DEPLOYMENT_DIR=$PWD/deployment
INSTANCES_DIR=$DEPLOYMENT_DIR/instances INSTANCES_DIR=$DEPLOYMENT_DIR/instances
pushd $DEPLOYMENT_DIR > /dev/null pushd $DEPLOYMENT_DIR > /dev/null
./decrypt-secrets.sh || exit 1 # ./decrypt-secrets.sh || exit 1
# . /tmp/secrets
gpg --decrypt --yes --batch --passphrase "$GPG_PASSPHRASE" --homedir /tmp/.gnupg -o /tmp/secrets secrets.asc
. /tmp/secrets . /tmp/secrets
rm /tmp/secrets rm /tmp/secrets
popd > /dev/null popd > /dev/null
@@ -59,6 +67,7 @@ for NAMESPACE_DIR in `find $INSTANCES_DIR -type d -mindepth 1 -maxdepth 1`; do
DATABASE_VARIABLE=$VARIABLE_PREFIX"_PGDATABASE" DATABASE_VARIABLE=$VARIABLE_PREFIX"_PGDATABASE"
NEW_UDI_DB_DATABASE="${!DATABASE_VARIABLE}" NEW_UDI_DB_DATABASE="${!DATABASE_VARIABLE}"
NEW_UDI_DB_HOST=database.database1.svc.cluster.local NEW_UDI_DB_HOST=database.database1.svc.cluster.local
INFLUXDB_URL=$VARIABLE_PREFIX"_INFLUXDB_URL"
kubectl create secret generic $INSTANCE-udi-db-cred \ kubectl create secret generic $INSTANCE-udi-db-cred \
--dry-run=client \ --dry-run=client \
@@ -71,6 +80,13 @@ for NAMESPACE_DIR in `find $INSTANCES_DIR -type d -mindepth 1 -maxdepth 1`; do
--from-literal=PGSSLMODE="require" | \ --from-literal=PGSSLMODE="require" | \
kubectl apply -f - -n $NAMESPACE kubectl apply -f - -n $NAMESPACE
kubectl create secret generic $INSTANCE-udi-influxdb-cred \
--dry-run=client \
-o yaml \
--save-config \
--from-literal=INFLUXDB_URL="${!INFLUXDB_URL}" | \
kubectl apply -f - -n $NAMESPACE
# set configuration as configMap # set configuration as configMap
kubectl create configmap $INSTANCE-udi-conf \ kubectl create configmap $INSTANCE-udi-conf \
--from-literal=UDI_CONF="`cat $CONFIG_FILE`" \ --from-literal=UDI_CONF="`cat $CONFIG_FILE`" \

View File

@@ -1,27 +0,0 @@
#!/bin/bash
ENCRYPTION_KEY=`openssl rand -hex 32`
echo $ENCRYPTION_KEY
SECRETS_PLAINTEXT_FILE=secrets.txt
SECRETS_CIPHERTEXT_FILE=secrets.enc
if [ `uname` = "Darwin" ]; then
cat $SECRETS_PLAINTEXT_FILE | md5
elif [ `uname` = "Linux" ]; then
cat $SECRETS_PLAINTEXT_FILE | md5sum - | awk '{print $1}'
fi
POD_NAME_SUFFIX=`date +%s`
cat $SECRETS_PLAINTEXT_FILE | \
kubectl run openssl-$POD_NAME_SUFFIX \
--rm \
--image bitnami/debian-base-buildpack:latest \
--env KEY=$ENCRYPTION_KEY \
-i \
-q \
-- \
/bin/sh -c "openssl enc -aes-256-cbc -salt -pass env:KEY -a" > \
$SECRETS_CIPHERTEXT_FILE

View File

@@ -0,0 +1,226 @@
{
"mqtt": {
"broker": "mqtt://mosquitto-broker-mqtt-anon-cluster.mosquitto.svc.cluster.local:1883",
"tlsEnable": "false"
},
"topicMappings": [
{
"topics": [ "snmp" ],
"handler": "PREP",
"id": "SNMP",
"config": {
"attributes": {
}
}
},
{
"topics": [ "tsm" ],
"handler": "PREP",
"id": "TSM",
"config": {
"attributes": {
}
}
},
{
"topics": [ "dt1/ai/periodic/1" ],
"handler": "DT1T",
"id": "DT1T.0",
"config": {
"attributes": {
"Application": "Temperature Wago",
"Device": "Freezer",
"HardLow": "-273",
"SoftLow": "-50",
"SoftHigh": "20",
"HardHigh": "100"
}
}
},
{
"topics": [ "dt1/ai/periodic/3" ],
"handler": "DT1T",
"id": "DT1T.1",
"config": {
"attributes": {
"Application": "Temperature Wago",
"Device": "Outdoor",
"HardLow": "-273",
"SoftLow": "-60",
"SoftHigh": "60",
"HardHigh": "100"
}
}
},
{
"topics": [ "IoT/PV/Values" ],
"handler": "PV",
"id": "PV",
"config": {
"databaseConnStr": "",
"attributes": {
}
}
},
{
"topics": [ "IoT/Car/Values" ],
"handler": "Car",
"id": "Car",
"config": {
"databaseConnStr": "",
"attributes": {
}
}
},
{
"topics": [ "locative/event/#" ],
"handler": "Locative",
"id": "Locative",
"config": {
"databaseConnStr": "",
"attributes": {
}
}
},
{
"topics": [ "IoT/MBGW3/Measurement" ],
"handler": "MBGW3",
"id": "MBGW3",
"config": {
"databaseConnStr": "",
"attributes": {
}
}
},
{
"topics": [ "IoT/OneWireGW/Bus 1/#" ],
"handler": "SVER",
"id": "SVER0",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Temperature Heating",
"payloadRegex": "(\\d+(\\.\\d+)?)\\s*([^0-9\\s]\\S*)",
"deviceFrom": "topic",
"devicePart": "3",
"valueFrom": "payload",
"valuePart": "1",
"valueType": "float",
"unitFrom": "payload",
"unitPart": "3"
}
}
},
{
"topics": [ "NR/Multisensor/+/Temperatur" ],
"handler": "SVEJ",
"id": "SVEJ0",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Temperature Multisensor",
"deviceSelector": "T:2",
"valueSelector": "J:$.CurrentTemperature",
"unitSelector": "C:°C"
}
}
},
{
"topics": [ "NR/Multisensor/+/Feuchte" ],
"handler": "SVEJ",
"id": "SVEJ1",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Humidity Multisensor",
"deviceSelector": "T:2",
"valueSelector": "J:$.CurrentRelativeHumidity",
"unitSelector": "C:%"
}
}
},
{
"topics": [ "zigbee2mqtt/+" ],
"handler": "Z2M",
"id": "Z2M",
"config": {
"databaseConnStr": "",
"attributes": {
}
}
},
{
"topics": [ "shellyplusht/+/status/temperature:0" ],
"handler": "SVEJ",
"id": "SVEJ2",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Temperature Shelly Plus HT",
"deviceSelector": "T:1",
"valueSelector": "J:$.tC",
"unitSelector": "C:°C"
}
}
},
{
"topics": [ "shellyplusht/+/status/humidity:0" ],
"handler": "SVEJ",
"id": "SVEJ3",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Humidity Shelly Plus HT",
"deviceSelector": "T:1",
"valueSelector": "J:$.rh",
"unitSelector": "C:%"
}
}
},
{
"topics": [ "shellies/sensor/+/status/temperature:0" ],
"handler": "SVEJ",
"id": "SVEJ4",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Shellies Sensor Temperature",
"deviceSelector": "T:2",
"valueSelector": "J:$.tC",
"unitSelector": "C:°C"
}
}
},
{
"topics": [ "shellies/sensor/+/status/humidity:0" ],
"handler": "SVEJ",
"id": "SVEJ5",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Shellies Sensor Humidity",
"deviceSelector": "T:2",
"valueSelector": "J:$.rh",
"unitSelector": "C:%"
}
}
},
{
"topics": [ "shellies/sensor/+/status/devicepower:0" ],
"handler": "SVEJ",
"id": "SVEJ6",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Shellies Sensor Power",
"deviceSelector": "T:2",
"valueSelector": "J:$.battery.percent",
"unitSelector": "C:%"
}
}
}
],
"archiver": {
"dir": "/archive"
}
}

8
deployment/secrets.asc Normal file
View File

@@ -0,0 +1,8 @@
-----BEGIN PGP MESSAGE-----
jA0ECQMIYUoTHR96Qfb90psBoxuk38UXPXTWPCmdW690bi2+w34S4NLHZvHfe3Ra
nck319+PXvr0agfHGZ733hhTQv4sa8I2o6ICrgFqtKGfHmgnqL5kYNP9+NuV/IsF
x3dxwjEejsZ5GYn/zk+CQceItQ8nyyJc2ms1KwTu2r4hMzuHmnVtvKxNCzPrw2N5
SJIRhh41eequFkzELQqqXXu10raBFsttOemVhA==
=TySu
-----END PGP MESSAGE-----

View File

@@ -1,38 +0,0 @@
U2FsdGVkX1+v6L4gc+CbYCZyo/UVN7QfmEntIBpk+GAHGf3d7m/4hfcYd39Eh2td
lXSmNdt1cdFw/UfZ1x1OlGm/fqLh/j/rWPgEc6BwEcDFDEXpTucTjUHNDonYNH8j
eDWeAGokfguqgQG16CBLHdeyocP0kTPJSrIKQgG1Mzzck/kfB1Z6Ggv4z5KEx2dy
2rrnm+BeFT1yITwoxa3iJeudcSQznNIqQa+Mx4fUsPV+yorahp4gs0PVVj9POnAT
yRhpQgkaq5oZNVcYrWS5+6mmhbzL5jIAa4wfzVep/69RcfBkV5Oj5JJGaQzH0T74
wg8dWz/scdi2kkCn0KroJPrsG/lAsFYhbX4kUJQeRUX1pWr/iwD0i8LRx+f2C82Y
HgpsnG6c5nPRy68TltgRgCRAIJj87rR/fATVowcpChfe9sXCwfLEZ5Q2hDK8eAPW
VS87axMkProyHJZe1GK0v9CAVWpXlxv6eAr8u2SftGA87Xu3ebQ4SjReXIcAb7M6
08UnxW4YcfH+usgU2GUuNlzRctAq334AfBWYQO51l/ELJAzaDi6Ht4Czr6R7Bsfh
M3ZcjcgqY7j7ywDFmKq/a8Q0Dsjm2sezNtrrRWusomgSKFEf8WncOdkcWOAiza4T
+Qubfr1SuZuWFF+migGtYM3X8YS+VpmMRIpJ1otibMELgjvldWGqHIK1uIThLq7F
MvQ0Nog6UNg79/8vrUoEUPPB5fQsXcNC5zcpVMrpJcGogBHhsXk1EPFcB75sx/65
bl2BZlCBacH9MNIBPh17dMC46EV1FNaLiO1N3/qJkxrkiG5wBDjDlnyMn/mYc/o1
olNuIO0nnn2x8ZU02lRo8RqcqOywseZeBhAzOj+899n5Qa/0YQAnb0Y9WAxqLft/
0C45HcK5Kgd3C6wqvVUqcQ/UMxQzv0y1cM8gbfpGjUvJ6gUj7vkW08D55A6gV8Lf
SrneWAP/1B1mmV16vHaXwoYpTpQwM7i7fHWBOpH7nq6E+0P3LHyon43dYo4P6KM7
He3R6phTFp36WI4ZCUQafTDZS196Ol2ZyEAonVwSOIEIyptXeoAmleolXC/eL84Z
bEbhld8g+ulrVSrBXFpCY3jBsqPVBYEpZaGYgevsrHPSbwWa/qQkTKnOO4+oz6Pe
9iJ1yJbSWfg6Gkr6iqE41Dp4VGXtwTDHHb9YMd56iWHAkxZLFIWdYUr8XfQS6j70
j5kV3jV/w5EHGYruBdtxAWc7YKq3pfqvh9R7dD/8JOFZhA140+zmOCWG4qdDhv+5
F9vlawudssa9ZHGi1jBFPCNW13LBhUdyCY3apKF4HHeeuA465uzxIqwtkJSigdun
vC9ooYZrJjYOnJSTJnKH0WSD0pPC6CIkge+Fxuksq6cst5Zcysw1xz5zs7UNeAP+
kLs1+8Kn2d1hJuzSWdWlj7xGratLEdA6pqcfBKvMYtY0kpPPDrxm+F1FZ7LyV+dc
G1vfI6aS2azrFrBNXSeOArJ/erGHIGhWxFY0c3bcGOjXwsLWRjQ03Kdj9ffj6UFL
4JJaI0I01RilAo+woaZhNmOHl1VxSsU1lDGF7IvW3t0qKLaSg/Rv3pQqdKyjq8I5
IxPlUEMdo1EDZZx4qLmYBM1tWhgMbn4nx6P2BS7obnPdaf3B0RPxI68Z49RYZKvR
/wTyr7oWCCRQDwCuVH8t/jUrSWspzEK7ApXHdh7T9JlNurFW7oxc8ylooQrAn3Gn
mru7X3cUeVtiosAklZ7w+JNxm44IRmDKNVDeAaat+q35EA8MRFGiuXEOeNw54tWH
zNkUyUJ79Ie7BkGrZFUFqkvfY3Q/xLaBGYDQe65S8/rerybL0YI7RmMiz4x7yq8L
GoIDwPsn0z/AFefoGTi0tAXZeC+EA62okK1kKR9qrh9gmD59uiMbFX1BHe3rWhgP
cCPScYeameXV3K6wwQpX8JTdptqMAH5cpEVoUZ/PZZpkaiCuWcMODVbqTpm4SRPt
Q9s5+6/g0TUUqz7Fwi0dlfnMZVuK0a1Uf/SBYR7f/UYVLfF5juTZ+IRJwQWwp6QX
CzfYms0W34/srtM72mQOpKTd0o3xuFyVbQtZPOpNghIjArQqwt34nEzXPYHqasDx
c/yIPdW+B/YVcFPdRV16Izqmjdlupv6pPjY/T6GdHczQsH9gD28HN9+Ka2Cvficf
evO7IXe0RuvodQ3tB4LmeWoJB10G7Sko2EEfpFTDXke9Ak/5cGrpdPMtbXCAIm1o
B5UhrqNuUYSWdo0mGttbSjFR7pyLujsxLNnp8teBi33QOUhrSId5+mOvtFDGiZKa
QCC+W+BIh6IFIwnxH4dDxjz3M65NXzqNV+6mXEFU77cX+oTF4BRe0R/L4nPoaBAN
smRxtqBItpVFUdsOVb6bXg==

View File

@@ -1 +0,0 @@
.venv

View File

@@ -1,38 +0,0 @@
import psycopg2
from loguru import logger
try:
srcConn = psycopg2.connect(database="level_monitoring_berresheim")
srcConn.autocommit = False
destConn = psycopg2.connect(database="udi-berresheim")
destConn.autocommit = False
with srcConn.cursor() as srcCur, destConn.cursor() as destCur:
srcCur.execute("select time, application_name, raw_level, level, status, battery from measurement_t")
for srcObj in srcCur:
timestamp = srcObj[0]
deviceName = srcObj[1]
rawLevel = srcObj[2]
level = srcObj[3]
status = srcObj[4]
battery = srcObj[5]
logger.info(f"{timestamp=}, {deviceName=}, {rawLevel=}, {level=}, {status=}, {battery=}")
destTime = timestamp
destApplication = "de-hottis-level-monitoring"
destDevice = "eui-a84041a2c18341d6"
destAttributes = '{"ApplicationId":"de-hottis-level-monitoring", "DeviceType":"dragino-ldds75", "Status":"' + status + '","Hint": "Migrated"}'
destValues = '{"Battery":{"unit":"V","label":"Battery","value":' + str(battery) + ',"variable":"Voltage"}, "Distance":{"unit":"mm","label":"Distance","variable":"Level","value":' + str(rawLevel) + '}, "CorrectedDistance":{"unit":"mm", "label":"CorrectedDistance", "variable":"Level","value":' + str(level) + '}}'
logger.info(f"{destTime=}, {destApplication=}, {destDevice=}, {destAttributes=}, {destValues=}")
destCur.execute("insert into measurements (time, application, device, attributes, values) values(%s, %s, %s, %s, %s)",
(destTime, destApplication, destDevice, destAttributes, destValues))
destConn.commit()
finally:
if srcConn:
srcConn.close()
if destConn:
destConn.close()

View File

@@ -1,79 +0,0 @@
import psycopg2
from loguru import logger
import os
srcPgHost = os.environ["SRC_PGHOST"]
srcPgUser = os.environ["SRC_PGUSER"]
srcPgPassword = os.environ["SRC_PGPASSWORD"]
srcPgDatabase = os.environ["SRC_PGDATABASE"]
destPgHost = os.environ["DEST_PGHOST"]
destPgUser = os.environ["DEST_PGUSER"]
destPgPassword = os.environ["DEST_PGPASSWORD"]
destPgDatabase = os.environ["DEST_PGDATABASE"]
try:
srcConn = psycopg2.connect(
host=srcPgHost,
dbname=srcPgDatabase,
user=srcPgUser,
password=srcPgPassword,
sslmode='require'
)
srcConn.autocommit = False
destConn = psycopg2.connect(
host=destPgHost,
dbname=destPgDatabase,
user=destPgUser,
password=destPgPassword,
sslmode='require'
)
destConn.autocommit = False
with srcConn.cursor() as srcCur, destConn.cursor() as destCur:
srcCur.execute("select time, deviceid, status, state, importenergyactive, importenergyreactive, exportenergyactive, exportenergyreactive, powerapparent, poweractive, powerreactive, powerdemandpositive, powerdemandreverse, factor, angle, voltage, current, powerdemand from pv_power_measurement_t order by time")
for srcObj in srcCur:
timestamp = srcObj[0]
deviceName = srcObj[1]
status = srcObj[2]
state = srcObj[3]
importenergyactive = srcObj[4]
importenergyreactive = srcObj[5]
exportenergyactive = srcObj[6]
exportenergyreactive = srcObj[7]
powerapparent = srcObj[8]
poweractive = srcObj[9]
powerreactive = srcObj[10]
powerdemandpositive = srcObj[11]
powerdemandreverse = srcObj[12]
factor = srcObj[13]
angle = srcObj[14]
voltage = srcObj[15]
current = srcObj[16]
powerdemand = srcObj[17]
logger.info(f"{timestamp=}, {deviceName=}")
destTime = timestamp
destApplication = "PV"
destDevice = "Powermeter"
destAttributes = f"{{\"ApplicationId\":\"PV\", \"Status\":\"{status}\",\"Hint\": \"Migrated\"}}"
destValues = f"{{\"Cnt\": {{\"unit\": \"\", \"label\": \"\", \"value\": \"-1\", \"variable\": \"Cnt\"}}, \"Angle\": {{\"unit\": \"degree\", \"label\": \"\", \"value\": \"{angle}\", \"variable\": \"Angle\"}}, \"State\": {{\"unit\": \"\", \"label\": \"\", \"value\": \"{state}\", \"variable\": \"State\"}}, \"Factor\": {{\"unit\": \"\", \"label\": \"\", \"value\": \"{factor}\", \"variable\": \"Factor\"}}, \"Current\": {{\"unit\": \"A\", \"label\": \"\", \"value\": \"{current}\", \"variable\": \"Current\"}}, \"Voltage\": {{\"unit\": \"V\", \"label\": \"\", \"value\": \"{voltage}\", \"variable\": \"Voltage\"}}, \"PowerActive\": {{\"unit\": \"W\", \"label\": \"\", \"value\": \"{poweractive}\", \"variable\": \"PowerActive\"}}, \"PowerApparent\": {{\"unit\": \"VA\", \"label\": \"\", \"value\": \"{powerapparent}\", \"variable\": \"PowerApparent\"}}, \"PowerReactive\": {{\"unit\": \"VA\", \"label\": \"\", \"value\": \"{powerreactive}\", \"variable\": \"PowerReactive\"}}, \"ExportEnergyActive\": {{\"unit\": \"Wh\", \"label\": \"\", \"value\": \"{exportenergyactive}\", \"variable\": \"ExportEnergyActive\"}}, \"ImportEnergyActive\": {{\"unit\": \"Wh\", \"label\": \"\", \"value\": \"{importenergyactive}\", \"variable\": \"ImportEnergyActive\"}}, \"PowerDemandReverse\": {{\"unit\": \"W\", \"label\": \"\", \"value\": \"{powerdemandreverse}\", \"variable\": \"PowerDemandReverse\"}}, \"PowerDemandPositive\": {{\"unit\": \"W\", \"label\": \"\", \"value\": \"{powerdemandpositive}\", \"variable\": \"PowerDemandPositive\"}}, \"ExportEnergyReactive\": {{\"unit\": \"VAh\", \"label\": \"\", \"value\": \"{exportenergyreactive}\", \"variable\": \"ExportEnergyReactive\"}}, \"ImportEnergyReactive\": {{\"unit\": \"VAh\", \"label\": \"\", \"value\": \"{importenergyreactive}\", \"variable\": \"ImportEnergyReactive\"}}}}"
logger.info(f"{destTime=}, {destApplication=}, {destDevice=}, {destAttributes=}, {destValues=}")
try:
destCur.execute("insert into measurements (time, application, device, attributes, values) values(%s, %s, %s, %s, %s)",
(destTime, destApplication, destDevice, destAttributes, destValues))
destConn.commit()
except Exception as e:
destConn.rollback()
logger.error(f"Error {e} when inserted time {destTime}")
finally:
if srcConn:
srcConn.close()
if destConn:
destConn.close()

View File

@@ -1,78 +0,0 @@
import psycopg2
from loguru import logger
import os
srcPgHost = os.environ["SRC_PGHOST"]
srcPgUser = os.environ["SRC_PGUSER"]
srcPgPassword = os.environ["SRC_PGPASSWORD"]
srcPgDatabase = os.environ["SRC_PGDATABASE"]
destPgHost = os.environ["DEST_PGHOST"]
destPgUser = os.environ["DEST_PGUSER"]
destPgPassword = os.environ["DEST_PGPASSWORD"]
destPgDatabase = os.environ["DEST_PGDATABASE"]
try:
srcConn = psycopg2.connect(
host=srcPgHost,
dbname=srcPgDatabase,
user=srcPgUser,
password=srcPgPassword,
sslmode='require'
)
srcConn.autocommit = False
destConn = psycopg2.connect(
host=destPgHost,
dbname=destPgDatabase,
user=destPgUser,
password=destPgPassword,
sslmode='require'
)
destConn.autocommit = False
with srcConn.cursor() as srcCur, destConn.cursor() as destCur:
srcCur.execute("select time, location, status, temperature, category from room_climate_measurement_t where category = 'heating' and time > '2023-12-19 05:20:00' order by time")
for srcObj in srcCur:
timestamp = srcObj[0]
location = srcObj[1]
status = srcObj[2]
temperature = srcObj[3]
category = srcObj[4]
logger.info(f"{timestamp=}, {location=}, {status=}, {temperature=}, {category=}")
destTime = timestamp
match category:
case 'heating':
destApplication = 'Temperature Heating'
case 'Outdoor':
destApplication = 'Temperature Wago'
case 'Device':
destApplication = 'Temperature Wago'
case 'Indoor':
destApplication = 'Temperature Multisensor' if location != 'Anna-Koeln-2' else 'Temperature Shelly Plus HT'
case 'Special':
destApplication = 'Temperature Multisensor'
destDevice = location
destAttributes = '{"ApplicationId":"temperature-imported", "Status":"' + status + '","Location":"' + location + '","Category":"' + category + '","Hint": "Migrated"}'
destValues = '{"Value": {"unit": "°C", "label": "", "value": "' + str(temperature) + '", "variable": ""}}'
logger.info(f"{destTime=}, {destApplication=}, {destDevice=}, {destAttributes=}, {destValues=}")
try:
destCur.execute("insert into measurements (time, application, device, attributes, values) values(%s, %s, %s, %s, %s)",
(destTime, destApplication, destDevice, destAttributes, destValues))
destConn.commit()
except Exception as e:
destConn.rollback()
logger.error(f"Error {e} when inserted time {destTime}")
finally:
if srcConn:
srcConn.close()
if destConn:
destConn.close()

View File

@@ -1,2 +0,0 @@
loguru==0.7.3
psycopg2==2.9.9

View File

@@ -129,3 +129,54 @@ create or replace view lora_sht21_v as
where m.application = 'de-hottis-app01' and where m.application = 'de-hottis-app01' and
m.attributes->>'DeviceType' = 'hottis-gy21' and m.attributes->>'DeviceType' = 'hottis-gy21' and
m.device = d.label; m.device = d.label;
create or replace view ntp_server_snmp_v as
select time,
device,
cast(values->'load1'->>'value' as float) as laLoad1,
cast(values->'lan-in'->>'value' as int) as lanInOctetsPerSeconds,
cast(values->'lan-out'->>'value' as int) as lanOutOctetsPerSeconds
from measurements
where application = 'SNMP' and device = '172.16.13.10';
create or replace view ntp_server_variables_v as
select time,
device,
cast(values->'rootdisp'->>'value' as float) as rootdisp
from measurements
where application = 'TSM' and device = '172.16.13.10';
-- Status string `unit:"" json:"status"`
-- Timestamp string `unit:"" json:"timestamp"`
-- VoltageL1 float32 `unit:"V" json:"voltageL1"`
-- VoltageL2 float32 `unit:"V" json:"voltageL2"`
-- VoltageL3 float32 `unit:"V" json:"voltageL3"`
-- CurrentL1 float32 `unit:"A" json:"currentL1"`
-- CurrentL2 float32 `unit:"A" json:"currentL2"`
-- CurrentL3 float32 `unit:"A" json:"currentL3"`
-- PowerL1 float32 `unit:"W" json:"powerL1"`
-- PowerL2 float32 `unit:"W" json:"powerL2"`
-- PowerL3 float32 `unit:"W" json:"powerL3"`
-- TotalImportEnergy float32 `unit:"Wh" json:"totalImportEnergy"`
-- TotalExportEnergy float32 `unit:"Wh" json:"totalExportEnergy"`
-- Cnt int `unit:"" json:"cnt"`
create or replace view car_values_v as
select time,
cast(values->'VoltageL1'->>'value' as float) as voltage_l1,
cast(values->'VoltageL2'->>'value' as float) as voltage_l2,
cast(values->'VoltageL3'->>'value' as float) as voltage_l3,
cast(values->'CurrentL1'->>'value' as float) as current_l1,
cast(values->'CurrentL2'->>'value' as float) as current_l2,
cast(values->'CurrentL3'->>'value' as float) as current_l3,
cast(values->'PowerL1'->>'value' as float) as power_l1,
cast(values->'PowerL2'->>'value' as float) as power_l2,
cast(values->'PowerL3'->>'value' as float) as power_l3,
cast(values->'TotalImportEnergy'->>'value' as float) as total_import_energy,
cast(values->'TotalExportEnergy'->>'value' as float) as total_export_energy,
values->'Status'->>'value' as status,
device
from measurements
where application = 'Car';

View File

@@ -1,18 +1,27 @@
{ {
"mqtt": { "mqtt": {
"broker": "mqtt://emqx01-anonymous-cluster-internal.broker.svc.cluster.local:1883", "broker": "mqtt://172.23.1.102:1883",
"tlsEnable": "false" "tlsEnable": "false"
}, },
"topicMappings": [ "topicMappings": [
{ {
"topics": [ "snmp" ], "topics": [ "snmp" ],
"handler": "SNMP", "handler": "PREP",
"id": "SNMP", "id": "SNMP",
"config": { "config": {
"attributes": { "attributes": {
} }
} }
}, },
{
"topics": [ "tsm" ],
"handler": "PREP",
"id": "TSM",
"config": {
"attributes": {
}
}
},
{ {
"topics": [ "dt1/ai/periodic/1" ], "topics": [ "dt1/ai/periodic/1" ],
"handler": "DT1T", "handler": "DT1T",
@@ -54,9 +63,9 @@
} }
}, },
{ {
"topics": [ "locative/event/#" ], "topics": [ "IoT/Car/Values" ],
"handler": "Locative", "handler": "Car",
"id": "Locative", "id": "Car",
"config": { "config": {
"databaseConnStr": "", "databaseConnStr": "",
"attributes": { "attributes": {
@@ -91,34 +100,6 @@
} }
} }
}, },
{
"topics": [ "NR/Multisensor/+/Temperatur" ],
"handler": "SVEJ",
"id": "SVEJ0",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Temperature Multisensor",
"deviceSelector": "T:2",
"valueSelector": "J:$.CurrentTemperature",
"unitSelector": "C:°C"
}
}
},
{
"topics": [ "NR/Multisensor/+/Feuchte" ],
"handler": "SVEJ",
"id": "SVEJ1",
"config": {
"databaseConnStr": "",
"attributes": {
"application": "Humidity Multisensor",
"deviceSelector": "T:2",
"valueSelector": "J:$.CurrentRelativeHumidity",
"unitSelector": "C:%"
}
}
},
{ {
"topics": [ "zigbee2mqtt/+" ], "topics": [ "zigbee2mqtt/+" ],
"handler": "Z2M", "handler": "Z2M",
@@ -159,6 +140,6 @@
} }
], ],
"archiver": { "archiver": {
"dir": "/archive" "dir": "./tmp/udi"
} }
} }

View File

@@ -5,23 +5,19 @@
}, },
"topicMappings": [ "topicMappings": [
{ {
"topics": [ "NR/Multisensor/+/Temperatur" ], "topics": [ "IoT/PV/Values" ],
"handler": "SVEJ", "handler": "PV",
"id": "SVEJ0", "id": "PV",
"config": { "config": {
"databaseConnStr": "", "databaseConnStr": "",
"attributes": { "attributes": {
"application": "Temperature Multisensor",
"deviceSelector": "T:2",
"valueSelector": "J:$.CurrentTemperature",
"unitSelector": "C:°C"
} }
} }
}, },
{ {
"topics": [ "zigbee2mqtt/+" ], "topics": [ "IoT/Car/Values" ],
"handler": "Z2M", "handler": "Car",
"id": "Z2M", "id": "Car",
"config": { "config": {
"databaseConnStr": "", "databaseConnStr": "",
"attributes": { "attributes": {

View File

@@ -1,46 +1,38 @@
package database package database
import "time" import "time"
import "gorm.io/gorm"
type VariableType struct { type VariableType struct {
Label string `json:"label"` Label string `json:"label"`
Variable string `json:"variable"` Variable string `json:"variable"`
Unit string `json:"unit"` Unit string `json:"unit"`
Value interface{} `json:"value,omitempty"` Value interface{} `json:"value,omitempty"`
Status string `json:"status,omitempty"`
} }
type Measurement struct { type Measurement struct {
Time time.Time `gorm:"not null;primary_key"` Time time.Time
Application string `gorm:"not null"` Application string
Device string Device string
Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"` Attributes map[string]interface{}
Values map[string]VariableType `gorm:"serializer:json;type:jsonb"` Values map[string]VariableType
}
// Simplified structures for backward compatibility
type DeviceType struct {
Label string
ModelIdentifier string
Attributes map[string]interface{}
} }
type Application struct { type Application struct {
gorm.Model Label string
Label string `gorm:"not null"` Attributes map[string]interface{}
Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"`
}
type DeviceType struct {
gorm.Model
Label string `gorm:"not null"`
ModelIdentifier string
Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"`
} }
type Device struct { type Device struct {
gorm.Model Label string
Label string `gorm:"not null;uniqueIndex:idx_label_application_id"` Application Application
ApplicationID int `gorm:"not null;uniqueIndex:idx_label_application_id"` DeviceType DeviceType
Application Application Attributes map[string]interface{}
DeviceTypeID int `gorm:"not null"`
DeviceType DeviceType
Attributes map[string]interface{} `gorm:"serializer:json;type:jsonb"`
} }

View File

@@ -1,96 +1,155 @@
package database package database
import ( import (
"log" "fmt"
//"time" "log"
"fmt" "os"
"udi/counter" "udi/counter"
"gorm.io/driver/postgres"
"gorm.io/gorm" influxdb "github.com/influxdata/influxdb1-client/v2"
) )
type DatabaseHandle struct { type DatabaseHandle struct {
initialized bool initialized bool
dbh *gorm.DB client influxdb.Client
database string
} }
func NewDatabaseHandle() *DatabaseHandle { func NewDatabaseHandle() *DatabaseHandle {
var db DatabaseHandle var db DatabaseHandle
// inject the whole database configuration via the well-known PG* env variables
conn, err := gorm.Open(postgres.Open("")) // Read configuration from environment variables
if err != nil { influxURL := os.Getenv("INFLUXDB_URL")
log.Printf("Unable to open database connection: %s", err) if influxURL == "" {
db.initialized = false influxURL = "http://localhost:8086"
} else { }
db.dbh = conn
db.initialized = true influxDB := os.Getenv("INFLUXDB_DATABASE")
log.Println("Database connection opened") if influxDB == "" {
} influxDB = "udi"
return &db }
username := os.Getenv("INFLUXDB_USER")
password := os.Getenv("INFLUXDB_PASSWORD")
// Create InfluxDB client
client, err := influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: influxURL,
Username: username,
Password: password,
})
if err != nil {
log.Printf("Unable to create InfluxDB client (config: URL: %s, Username: %s, Password: %s): %s", influxDB, username, password, err)
db.initialized = false
return &db
}
// Test connection
_, _, err = client.Ping(0)
if err != nil {
log.Printf("Unable to ping InfluxDB: %s", err)
db.initialized = false
client.Close()
return &db
}
db.client = client
db.database = influxDB
db.initialized = true
log.Printf("InfluxDB connection opened (URL: %s, Database: %s)", influxURL, influxDB)
return &db
} }
func (self *DatabaseHandle) StoreMeasurement(measurement *Measurement) { func (self *DatabaseHandle) StoreMeasurement(measurement *Measurement) {
if ! self.initialized { if !self.initialized {
log.Printf("Database connection not initialized, can not store, measurement %s lost", measurement) log.Printf("Database connection not initialized, can not store, measurement %v lost", measurement)
counter.F("Stored") counter.F("Stored")
return return
} }
result := self.dbh.Create(measurement) // Create batch points
if result.Error != nil { bp, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{
log.Printf("Unable to insert, measurement %s lost, error: %s", measurement, result.Error) Database: self.database,
counter.F("Stored") Precision: "s",
return })
} if err != nil {
log.Printf("Unable to create batch points: %s", err)
counter.F("Stored")
return
}
log.Println("Successfully stored measurement") // Build tags
counter.S("Stored") tags := make(map[string]string)
if measurement.Device != "" {
tags["device"] = measurement.Device
}
// Build fields from Values
fields := make(map[string]interface{})
for key, varType := range measurement.Values {
// Store the value with the variable name as field key
fields[key] = varType.Value
// Optionally store metadata as separate fields
if varType.Unit != "" {
fields[key+"_unit"] = varType.Unit
}
// This is already the column name, so we can skip it
//if varType.Variable != "" {
// fields[key+"_variable"] = varType.Variable
//}
if varType.Status != "" {
fields[key+"_status"] = varType.Status
}
}
// Add attributes as fields
for key, value := range measurement.Attributes {
if strValue, ok := value.(string); ok {
fields[key] = strValue
} else {
fields[key] = fmt.Sprintf("%v", value)
}
}
// Ensure we have at least one field
if len(fields) == 0 {
log.Printf("No fields to store in measurement, skipping")
counter.F("Stored")
return
}
// Create point
pt, err := influxdb.NewPoint(
measurement.Application,
tags,
fields,
measurement.Time,
)
if err != nil {
log.Printf("Unable to create point: %s", err)
counter.F("Stored")
return
}
bp.AddPoint(pt)
// Write batch
err = self.client.Write(bp)
if err != nil {
log.Printf("Unable to write to InfluxDB, measurement lost, error: %s", err)
counter.F("Stored")
return
}
log.Println("Successfully stored measurement")
counter.S("Stored")
} }
func (self *DatabaseHandle) GetDeviceByLabelAndApplication(applicationLabel string, deviceLabel string) (*Device, error) { func (self *DatabaseHandle) Close() {
if ! self.initialized { if self.initialized && self.client != nil {
err := fmt.Errorf("Database connection not initialized") self.client.Close()
return nil, err log.Println("InfluxDB connection closed")
} }
var device Device
result := self.dbh.
Preload("Application").
Preload("DeviceType").
Joins("JOIN applications ON devices.application_id = applications.id").
Where("devices.label = ? AND applications.label = ?", deviceLabel, applicationLabel).
First(&device)
if result.Error != nil {
err := fmt.Errorf("Query failed: %s", result.Error)
return nil, err
}
return &device, nil
} }
func (self *DatabaseHandle) GetDeviceByLabel(deviceLabel string) (*Device, error) {
if ! self.initialized {
err := fmt.Errorf("Database connection not initialized")
return nil, err
}
var device Device
result := self.dbh.
Preload("Application").
Preload("DeviceType").
Where("devices.label = ?", deviceLabel).
First(&device)
if result.Error != nil {
err := fmt.Errorf("Query failed: %s", result.Error)
return nil, err
}
return &device, nil
}

View File

@@ -1,55 +0,0 @@
package database
import (
"log"
//"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func Migrate() {
dsn := ""
db, err := gorm.Open(postgres.Open(dsn))
if err != nil {
log.Fatalf("Unable to open database connection: %s", err)
}
db.AutoMigrate(&Application{})
log.Println("Application created")
db.AutoMigrate(&DeviceType{})
log.Println("DeviceType created")
db.AutoMigrate(&Device{})
log.Println("Device created")
db.AutoMigrate(&Measurement{})
log.Println("Measurement created")
log.Println("Remember to call create_hypertable on measurements, sowhat I can't do that for you.")
/*
m := Measurement {
Time: time.Now(),
Application: "app",
Attributes: nil,
Values: []SensorType {
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 1.0 },
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 3.0 },
},
}
db.Create(&m)
m = Measurement {
Time: time.Now(),
Application: "app",
Attributes: nil,
Values: []SensorType {
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 10.0 },
{ Variable: "Temperature", Unit: "Degree Celsius", Value: 30.0 },
},
}
db.Create(&m)
*/
}

View File

@@ -1,41 +0,0 @@
create extension if not exists timescaledb;
create table application_t (
id serial not null primary key,
label text not null unique,
attributes jsonb
);
create table sensor_type_t (
id serial not null primary key,
label text not null unique,
variable text not null,
unit text not null,
converter text not null,
attributes jsonb
);
create table sensor_t (
id serial not null primary key,
label text not null,
application int references application_t(id),
sensor_type int references sensor_type_t(id),
attributes jsonb,
unique (label, application)
);
create table measurement_t (
time timestamp without time zone not null,
application text not null,
sensor_type text not null,
sensor text not null,
variable text not null,
unit text not null,
value double precision not null,
attributes jsonb
);
select create_hypertable('measurement_t', 'time');

View File

@@ -1,149 +1,154 @@
package dispatcher package dispatcher
import "log" import (
import "time" "fmt"
import "os" "log"
import "fmt" "net/url"
import "net/url" "os"
import "udi/mqtt" "time"
import "udi/config" "udi/config"
import "udi/counter" "udi/counter"
import "udi/handlers/handler" "udi/handlers/car"
import "udi/handlers/ttn" "udi/handlers/dt1t"
import "udi/handlers/iot" "udi/handlers/handler"
import "udi/handlers/pv" "udi/handlers/iot"
import "udi/handlers/mbgw3" "udi/handlers/locative"
import "udi/handlers/sver" "udi/handlers/mbgw3"
import "udi/handlers/svej" "udi/handlers/prepared"
import "udi/handlers/dt1t" "udi/handlers/pv"
import "udi/handlers/locative" "udi/handlers/svej"
import "udi/handlers/snmp" "udi/handlers/sver"
import "udi/handlers/z2m"
// "udi/handlers/ttn"
"udi/handlers/z2m"
"udi/mqtt"
)
var handlerMap map[string]handler.Handler = make(map[string]handler.Handler) var handlerMap map[string]handler.Handler = make(map[string]handler.Handler)
var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100) var archiverChannel chan handler.MessageT = make(chan handler.MessageT, 100)
func InitDispatcher() { func InitDispatcher() {
log.Printf("Dispatcher initializing") log.Printf("Dispatcher initializing")
go archiver() go archiver()
for _, mapping := range config.Config.TopicMappings { for _, mapping := range config.Config.TopicMappings {
// log.Printf("Trying to initialize %s", mapping) // log.Printf("Trying to initialize %s", mapping)
var factory interface{} var factory interface{}
switch mapping.Handler { switch mapping.Handler {
case "TTN": // case "TTN":
factory = ttn.New // factory = ttn.New
case "IoT": case "IoT":
factory = iot.New factory = iot.New
case "PV": case "PV":
factory = pv.New factory = pv.New
case "MBGW3": case "MBGW3":
factory = mbgw3.New factory = mbgw3.New
case "SVER": case "SVER":
factory = sver.New factory = sver.New
case "SVEJ": case "SVEJ":
factory = svej.New factory = svej.New
case "DT1T": case "DT1T":
factory = dt1t.New factory = dt1t.New
case "Locative": case "Locative":
factory = locative.New factory = locative.New
case "SNMP": case "PREP":
factory = snmp.New factory = prepared.New
case "Z2M": case "Z2M":
factory = z2m.New factory = z2m.New
default: case "Car":
factory = nil factory = car.New
log.Printf("No handler %s found, ignore mapping", mapping.Handler) default:
} factory = nil
log.Printf("No handler %s found, ignore mapping", mapping.Handler)
}
fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler) fn, ok := factory.(func(string, config.HandlerConfigT) handler.Handler)
if ! ok { if !ok {
log.Println("Typ Assertion failed") log.Println("Typ Assertion failed")
break break
} }
handler := fn(mapping.Id, mapping.Config) handler := fn(mapping.Id, mapping.Config)
handlerMap[mapping.Id] = handler handlerMap[mapping.Id] = handler
} }
//log.Printf("handlerMap: %s", handlerMap) //log.Printf("handlerMap: %s", handlerMap)
} }
func storeMessage(filename string, item handler.MessageT) { func storeMessage(filename string, item handler.MessageT) {
file, err := os.OpenFile(filename, os.O_APPEND | os.O_CREATE | os.O_WRONLY, 0644) file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err) log.Printf("Unable to open archiving file %s, message is not archived: %s", filename, err)
counter.F("Archived") counter.F("Archived")
return return
} }
defer file.Close() defer file.Close()
archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload) archivingString := fmt.Sprintf("%s - %s - %s\n", item.Timestamp.Format("2006-01-02 15:04:05"), item.Topic, item.Payload)
_, err = file.WriteString(string(archivingString) + "\n") _, err = file.WriteString(string(archivingString) + "\n")
if err != nil { if err != nil {
log.Printf("Unable to write message, message is not archived: %s", err) log.Printf("Unable to write message, message is not archived: %s", err)
counter.F("Archived") counter.F("Archived")
return return
} }
//log.Println("Successfully archived message") //log.Println("Successfully archived message")
counter.S("Archived") counter.S("Archived")
} }
func archiver() { func archiver() {
archivingRootDir := config.Config.Archiver.Dir archivingRootDir := config.Config.Archiver.Dir
var lastArchivingDir string var lastArchivingDir string
for { for {
select { select {
case message := <- archiverChannel: case message := <-archiverChannel:
currentDateStr := message.Timestamp.Format("2006/01/02/15") currentDateStr := message.Timestamp.Format("2006/01/02/15")
currentArchivingDir := archivingRootDir + "/" + currentDateStr currentArchivingDir := archivingRootDir + "/" + currentDateStr
if currentArchivingDir != lastArchivingDir { if currentArchivingDir != lastArchivingDir {
err := os.MkdirAll(currentArchivingDir, 0755) err := os.MkdirAll(currentArchivingDir, 0755)
if err != nil { if err != nil {
log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err) log.Printf("Unable to create archiving dir %s: %s", currentArchivingDir, err)
counter.F("Archived") counter.F("Archived")
} }
lastArchivingDir = currentArchivingDir lastArchivingDir = currentArchivingDir
//log.Printf("Archiving dir %s created", currentArchivingDir) //log.Printf("Archiving dir %s created", currentArchivingDir)
} }
archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic)) archivingFilename := fmt.Sprintf("%s/%s", currentArchivingDir, url.PathEscape(message.Topic))
storeMessage(archivingFilename, message) storeMessage(archivingFilename, message)
} }
} }
} }
func InputDispatcher() { func InputDispatcher() {
for { for {
select { select {
case mqttMessage := <- mqtt.InputChannel: case mqttMessage := <-mqtt.InputChannel:
//log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic) //log.Printf("Message arrived in inputDispatcher, topic: %s\n", mqttMessage.Topic)
message := handler.MessageT { time.Now(), mqttMessage.Topic, string(mqttMessage.Payload) } message := handler.MessageT{time.Now(), mqttMessage.Topic, string(mqttMessage.Payload)}
archiverChannel <- message archiverChannel <- message
handleMessage(message) handleMessage(message)
} }
} }
} }
func handleMessage(message handler.MessageT) { func handleMessage(message handler.MessageT) {
for _, mapping := range config.Config.TopicMappings { for _, mapping := range config.Config.TopicMappings {
// log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler) // log.Printf("Testing %s -> %s", mapping.Topics, mapping.Handler)
for _, subscribedTopic := range mapping.Topics { for _, subscribedTopic := range mapping.Topics {
// log.Printf("Testing %s in %s", message.Topic, subscribedTopic) // log.Printf("Testing %s in %s", message.Topic, subscribedTopic)
if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) { if mqtt.TopicMatchesSubscription(message.Topic, subscribedTopic) {
//log.Printf("Handle message in handler %s", mapping.Id) //log.Printf("Handle message in handler %s", mapping.Id)
handler, exists := handlerMap[mapping.Id] handler, exists := handlerMap[mapping.Id]
if exists { if exists {
handler.Handle(message) handler.Handle(message)
counter.S("Dispatched") counter.S("Dispatched")
return return
} else { } else {
log.Printf("Handler %s not found, message %s is lost", mapping.Id, message) log.Printf("Handler %s not found, message %s is lost", mapping.Id, message)
counter.F("Dispatched") counter.F("Dispatched")
} }
} }
} }
} }
log.Printf("No match for topic %s, message %s is lost", message.Topic, message) log.Printf("No match for topic %s, message %s is lost", message.Topic, message)
counter.F("Dispatched") counter.F("Dispatched")
} }

View File

@@ -3,23 +3,14 @@ module udi
go 1.22.3 go 1.22.3
require ( require (
github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/google/uuid v1.4.0 github.com/google/uuid v1.6.0
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
gorm.io/driver/postgres v1.5.9
gorm.io/gorm v1.25.11
) )
require ( require (
github.com/gorilla/websocket v1.5.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect golang.org/x/net v0.34.0 // indirect
golang.org/x/sync v0.10.0 // indirect golang.org/x/sync v0.10.0 // indirect
golang.org/x/text v0.21.0 // indirect
) )

View File

@@ -1,46 +1,14 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI= github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4= github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8=
gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/gorm v1.25.11 h1:/Wfyg1B/je1hnDx3sMkX+gAlxrlZpn6X0BXRlwXlvHg=
gorm.io/gorm v1.25.11/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=

View File

@@ -0,0 +1,94 @@
package car
import (
"encoding/json"
"log"
"reflect"
"time"
"udi/config"
"udi/database"
"udi/handlers/handler"
)
type CarHandler struct {
handler.CommonHandler
dbh *database.DatabaseHandle
}
/*
{
"status": "Ok",
"timestamp": "2025-12-15T13:11:15.648243",
"voltageL1": 228.68,
"voltageL2": 227.69,
"voltageL3": 228.53,
"currentL1": 0.0,
"currentL2": 0.0,
"currentL3": 0.0,
"powerL1": 0.0,
"powerL2": 0.0,
"powerL3": 0.0,
"totalImportEnergy": 0.0,
"totalExportEnergy": 0.0,
"cnt": 399300}
*/
type CarValue struct {
Status string `unit:"" json:"status"`
Timestamp string `unit:"" json:"timestamp"`
VoltageL1 float32 `unit:"V" json:"voltageL1"`
VoltageL2 float32 `unit:"V" json:"voltageL2"`
VoltageL3 float32 `unit:"V" json:"voltageL3"`
CurrentL1 float32 `unit:"A" json:"currentL1"`
CurrentL2 float32 `unit:"A" json:"currentL2"`
CurrentL3 float32 `unit:"A" json:"currentL3"`
PowerL1 float32 `unit:"W" json:"powerL1"`
PowerL2 float32 `unit:"W" json:"powerL2"`
PowerL3 float32 `unit:"W" json:"powerL3"`
TotalImportEnergy float32 `unit:"Wh" json:"totalImportEnergy"`
TotalExportEnergy float32 `unit:"Wh" json:"totalExportEnergy"`
Cnt int `unit:"" json:"cnt"`
}
func New(id string, config config.HandlerConfigT) handler.Handler {
t := &CarHandler{}
t.Id = id
t.dbh = database.NewDatabaseHandle()
log.Printf("Handler Car %d initialized", id)
return t
}
func (self *CarHandler) Handle(message handler.MessageT) {
//log.Printf("Handler Car %d processing %s -> %s", self.id, message.Topic, message.Payload)
var carValue CarValue
err := json.Unmarshal([]byte(message.Payload), &carValue)
if err != nil {
self.Lost("Unable to parse payload into carValue struct", err, message)
return
}
variables := make(map[string]database.VariableType)
carValueStructValue := reflect.ValueOf(carValue)
for i := 0; i < carValueStructValue.NumField(); i++ {
field := carValueStructValue.Type().Field(i)
fieldValue := carValueStructValue.Field(i)
v := database.VariableType{
Label: "",
Variable: field.Name,
Unit: field.Tag.Get("unit"),
Value: fieldValue.Interface(),
}
variables[field.Name] = v
}
measurement := database.Measurement{
Time: time.Now(),
Application: "Car",
Device: "Powermeter",
Values: variables,
}
self.dbh.StoreMeasurement(&measurement)
self.S()
}

View File

@@ -1,81 +1,75 @@
package dt1t package dt1t
import ( import (
"log" "log"
"fmt" "strconv"
"time" "time"
"strconv" "udi/config"
"udi/handlers/handler" "udi/database"
"udi/database" "udi/handlers/handler"
"udi/config"
) )
type Dt1tHandler struct { type Dt1tHandler struct {
handler.CommonHandler handler.CommonHandler
ready bool ready bool
label string label string
dbh *database.DatabaseHandle dbh *database.DatabaseHandle
application string application string
device string device string
} }
func New(id string, config config.HandlerConfigT) handler.Handler { func New(id string, config config.HandlerConfigT) handler.Handler {
t := &Dt1tHandler { t := &Dt1tHandler{}
}
if config.Attributes["Application"] == "" { if config.Attributes["Application"] == "" {
log.Println("Error: application not configured") log.Println("Error: application not configured")
return t return t
} }
t.application = config.Attributes["Application"] t.application = config.Attributes["Application"]
if config.Attributes["Device"] == "" { if config.Attributes["Device"] == "" {
log.Println("Error: device not configured") log.Println("Error: device not configured")
return t return t
} }
t.device = config.Attributes["Device"] t.device = config.Attributes["Device"]
t.Id = id t.Id = id
t.dbh = database.NewDatabaseHandle() t.dbh = database.NewDatabaseHandle()
log.Printf("Handler DT1T %d initialized", id) log.Printf("Handler DT1T %d initialized", id)
t.ready = true t.ready = true
return t return t
} }
func (self *Dt1tHandler) Handle(message handler.MessageT) { func (self *Dt1tHandler) Handle(message handler.MessageT) {
if ! self.ready { if !self.ready {
self.Lost("Handler is not marked as ready", nil, message) self.Lost("Handler is not marked as ready", nil, message)
return return
} }
// log.Printf("Handler DT1T %d processing %s -> %s", self.id, message.Topic, message.Payload) // log.Printf("Handler DT1T %d processing %s -> %s", self.id, message.Topic, message.Payload)
temperature, err := strconv.Atoi(message.Payload) temperature, err := strconv.Atoi(message.Payload)
if err != nil { if err != nil {
self.Lost("Invalid raw value", err, message) self.Lost("Invalid raw value", err, message)
return return
} }
if temperature & 0x8000 != 0{ if temperature&0x8000 != 0 {
temperature = ((temperature - 1) ^ 0xffff) * -1 temperature = ((temperature - 1) ^ 0xffff) * -1
} }
temperatureF := float32(temperature) / 10.0 temperatureF := float32(temperature) / 10.0
var measurement database.Measurement var measurement database.Measurement
measurement.Time = time.Now() measurement.Time = time.Now()
measurement.Application = self.application measurement.Application = self.application
measurement.Device = self.device measurement.Device = self.device
var variable database.VariableType var variable database.VariableType
variable.Label = "Temperature" variable.Label = "Temperature"
variable.Variable = "" variable.Variable = ""
variable.Unit = "°C" variable.Unit = "°C"
variable.Value = fmt.Sprintf("%f", temperatureF) variable.Value = temperatureF
measurement.Values = make(map[string]database.VariableType) measurement.Values = make(map[string]database.VariableType)
measurement.Values["Value"] = variable measurement.Values["Value"] = variable
// log.Printf("Prepared measurement item: %s", measurement) // log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement) self.dbh.StoreMeasurement(&measurement)
self.S() self.S()
} }

View File

@@ -74,12 +74,22 @@ func (self *Mbgw3Handler) Handle(message handler.MessageT) {
measurement.Values = make(map[string]database.VariableType) measurement.Values = make(map[string]database.VariableType)
unitMap := map[string]string { "Energy": "Wh", "Power": "W", "Voltage": "V", "Current": "A", "Volume": "m3" } unitMap := map[string]string { "Energy": "Wh", "Power": "W", "Voltage": "V", "Current": "A", "Volume": "m3" }
keyCount := make(map[string]int)
for k, v := range observation.Values { for k, v := range observation.Values {
unit, exists := unitMap[k] unit, exists := unitMap[k]
if ! exists { if ! exists {
unit = "Unmapped Unit" unit = "Unmapped Unit"
} }
measurement.Values[k] = database.VariableType {
// Check if key already exists and create unique key if needed
keyCount[k]++
uniqueKey := k
if keyCount[k] > 1 {
uniqueKey = k + strconv.Itoa(keyCount[k])
}
measurement.Values[uniqueKey] = database.VariableType {
Label: "", Label: "",
Variable: k, Variable: k,
Unit: unit, Unit: unit,

View File

@@ -1,4 +1,4 @@
package snmp package prepared
import ( import (
"time" "time"
@@ -10,7 +10,7 @@ import (
) )
type SnmpHandler struct { type PreparedHandler struct {
handler.CommonHandler handler.CommonHandler
dbh *database.DatabaseHandle dbh *database.DatabaseHandle
} }
@@ -19,6 +19,8 @@ type endpoint_t struct {
Label string `json:"label"` Label string `json:"label"`
Variable string `json:"variable"` Variable string `json:"variable"`
Value string `json:"value"` Value string `json:"value"`
Unit string `json:"unit"`
Status string `json:"status"`
} }
type observation_t struct { type observation_t struct {
@@ -29,16 +31,16 @@ type observation_t struct {
func New(id string, config config.HandlerConfigT) handler.Handler { func New(id string, config config.HandlerConfigT) handler.Handler {
t := &SnmpHandler { t := &PreparedHandler {
} }
t.Id = id t.Id = id
t.dbh = database.NewDatabaseHandle() t.dbh = database.NewDatabaseHandle()
log.Printf("Handler SNMP %d initialized", id) log.Printf("Handler Prepared %d initialized", id)
return t return t
} }
func (self *SnmpHandler) Handle(message handler.MessageT) { func (self *PreparedHandler) Handle(message handler.MessageT) {
// log.Printf("Handler SNMP %d processing %s -> %s", self.Id, message.Topic, message.Payload) log.Printf("Handler Prepared %d processing %s -> %s", self.Id, message.Topic, message.Payload)
var observation observation_t var observation observation_t
err := json.Unmarshal([]byte(message.Payload), &observation) err := json.Unmarshal([]byte(message.Payload), &observation)
@@ -50,7 +52,7 @@ func (self *SnmpHandler) Handle(message handler.MessageT) {
var measurement database.Measurement var measurement database.Measurement
measurement.Time = time.Now() measurement.Time = time.Now()
measurement.Application = "SNMP" measurement.Application = self.Id
measurement.Device = observation.Device measurement.Device = observation.Device
measurement.Attributes = map[string]interface{} { measurement.Attributes = map[string]interface{} {
@@ -62,12 +64,13 @@ func (self *SnmpHandler) Handle(message handler.MessageT) {
measurement.Values[k] = database.VariableType { measurement.Values[k] = database.VariableType {
Label: v.Label, Label: v.Label,
Variable: v.Variable, Variable: v.Variable,
Unit: "", Unit: v.Unit,
Value: v.Value, Value: v.Value,
Status: v.Status,
} }
} }
// log.Printf("Prepared measurement item: %s", measurement) log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement) self.dbh.StoreMeasurement(&measurement)
self.S() self.S()

View File

@@ -1,29 +1,30 @@
package svej package svej
import ( import (
"log" "encoding/json"
"time" "fmt"
"strconv" "log"
"strings" "strconv"
"fmt" "strings"
"github.com/oliveagle/jsonpath" "time"
"encoding/json" "udi/config"
"udi/config" "udi/database"
"udi/handlers/handler" "udi/handlers/handler"
"udi/database"
"github.com/oliveagle/jsonpath"
) )
type SingleValueExtractorJsonpathHandler struct { type SingleValueExtractorJsonpathHandler struct {
handler.CommonHandler handler.CommonHandler
ready bool ready bool
application string application string
deviceSelector string deviceSelector string
valueSelector string valueSelector string
unitSelector string unitSelector string
deviceJsonpath *jsonpath.Compiled deviceJsonpath *jsonpath.Compiled
valueJsonpath *jsonpath.Compiled valueJsonpath *jsonpath.Compiled
unitJsonpath *jsonpath.Compiled unitJsonpath *jsonpath.Compiled
dbh *database.DatabaseHandle dbh *database.DatabaseHandle
} }
/* /*
@@ -33,134 +34,131 @@ T:TopicPartIndex
C:ConstantValue C:ConstantValue
*/ */
func New(id string, config config.HandlerConfigT) handler.Handler { func New(id string, config config.HandlerConfigT) handler.Handler {
t := &SingleValueExtractorJsonpathHandler { t := &SingleValueExtractorJsonpathHandler{
ready: false, ready: false,
} }
if config.Attributes["application"] == "" { if config.Attributes["application"] == "" {
log.Println("Error: application not configured") log.Println("Error: application not configured")
return t return t
} }
t.application = config.Attributes["application"] t.application = config.Attributes["application"]
t.deviceSelector = config.Attributes["deviceSelector"] t.deviceSelector = config.Attributes["deviceSelector"]
if t.deviceSelector[:2] == "J:" { if t.deviceSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.deviceSelector[2:]) jp, err := jsonpath.Compile(t.deviceSelector[2:])
if err != nil { if err != nil {
log.Printf("Unable to compile deviceJsonpath: %s, %s", t.deviceSelector[2:], err) log.Printf("Unable to compile deviceJsonpath: %s, %s", t.deviceSelector[2:], err)
return t return t
} }
t.deviceJsonpath = jp t.deviceJsonpath = jp
} }
t.valueSelector = config.Attributes["valueSelector"] t.valueSelector = config.Attributes["valueSelector"]
if t.valueSelector[:2] == "J:" { if t.valueSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.valueSelector[2:]) jp, err := jsonpath.Compile(t.valueSelector[2:])
if err != nil { if err != nil {
log.Printf("Unable to compile valueJsonpath: %s, %s", t.valueSelector[2:], err) log.Printf("Unable to compile valueJsonpath: %s, %s", t.valueSelector[2:], err)
return t return t
} }
t.valueJsonpath = jp t.valueJsonpath = jp
} }
t.unitSelector = config.Attributes["unitSelector"] t.unitSelector = config.Attributes["unitSelector"]
if t.unitSelector[:2] == "J:" { if t.unitSelector[:2] == "J:" {
jp, err := jsonpath.Compile(t.unitSelector[2:]) jp, err := jsonpath.Compile(t.unitSelector[2:])
if err != nil { if err != nil {
log.Printf("Unable to compile unitJsonpath: %s, %s", t.unitSelector[2:], err) log.Printf("Unable to compile unitJsonpath: %s, %s", t.unitSelector[2:], err)
return t return t
} }
t.unitJsonpath = jp t.unitJsonpath = jp
} }
t.Id = id t.Id = id
t.ready = true t.ready = true
t.dbh = database.NewDatabaseHandle() t.dbh = database.NewDatabaseHandle()
log.Printf("Handler SVEJ %d initialized", id) log.Printf("Handler SVEJ %s initialized", id)
return t return t
} }
func (self *SingleValueExtractorJsonpathHandler) ExtractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (string, error) { func (self *SingleValueExtractorJsonpathHandler) ExtractionHelper(subTopics []string, jPayload interface{}, selector string, jp *jsonpath.Compiled) (interface{}, error) {
var res string var res interface{}
switch selector[:2] { switch selector[:2] {
case "J:": case "J:":
// extract using jsonpath from payload // extract using jsonpath from payload
r, e := jp.Lookup(jPayload) r, e := jp.Lookup(jPayload)
if e != nil { if e != nil {
return "", fmt.Errorf("jp.Lookup failed with %s", e) return "", fmt.Errorf("jp.Lookup failed with %s", e)
} }
res = fmt.Sprint(r) res = r
case "T:": case "T:":
// T: extract from topic // T: extract from topic
i, e := strconv.Atoi(selector[2:]) i, e := strconv.Atoi(selector[2:])
if e != nil { if e != nil {
return "", fmt.Errorf("Atoi failed with %s", e) return "", fmt.Errorf("Atoi failed with %s", e)
} }
if i >= len(subTopics) { if i >= len(subTopics) {
return "", fmt.Errorf("not enough subtopics") return "", fmt.Errorf("not enough subtopics")
} }
res = subTopics[i] res = subTopics[i]
case "C:": case "C:":
// use constant value // use constant value
res = selector[2:] res = selector[2:]
default: default:
return "", fmt.Errorf("Invalid selector: %s", selector[:2]) return "", fmt.Errorf("Invalid selector: %s", selector[:2])
} }
return res, nil return res, nil
} }
func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT) { func (self *SingleValueExtractorJsonpathHandler) Handle(message handler.MessageT) {
if ! self.ready { if !self.ready {
self.Lost("Handler is not marked as ready", nil, message) self.Lost("Handler is not marked as ready", nil, message)
return return
} }
log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload) log.Printf("Handler SingleValueExtractorJsonpath %d processing %s -> %s", self.Id, message.Topic, message.Payload)
var measurement database.Measurement var measurement database.Measurement
measurement.Time = time.Now() measurement.Time = time.Now()
measurement.Application = self.application measurement.Application = self.application
subTopics := strings.Split(message.Topic, "/") subTopics := strings.Split(message.Topic, "/")
log.Printf("Subtopics: %s", strings.Join(subTopics, ", ")) log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
var jPayload interface{} var jPayload interface{}
err := json.Unmarshal([]byte(message.Payload), &jPayload) err := json.Unmarshal([]byte(message.Payload), &jPayload)
if err != nil { if err != nil {
self.Lost("Unable to unmarshal payload", err, message) self.Lost("Unable to unmarshal payload", err, message)
return return
} }
device, err1 := self.ExtractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath) device, err1 := self.ExtractionHelper(subTopics, jPayload, self.deviceSelector, self.deviceJsonpath)
if err1 != nil { if err1 != nil {
self.Lost("Device extraction failed", err1, message) self.Lost("Device extraction failed", err1, message)
return return
} }
log.Printf("device: %s", device) log.Printf("device: %s", device)
value, err2 := self.ExtractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath) value, err2 := self.ExtractionHelper(subTopics, jPayload, self.valueSelector, self.valueJsonpath)
if err2 != nil { if err2 != nil {
self.Lost("Value extraction failed", err2, message) self.Lost("Value extraction failed", err2, message)
return return
} }
unit, err3 := self.ExtractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath) unit, err3 := self.ExtractionHelper(subTopics, jPayload, self.unitSelector, self.unitJsonpath)
if err3 != nil { if err3 != nil {
self.Lost("Unit extraction failed", err3, message) self.Lost("Unit extraction failed", err3, message)
return return
} }
measurement.Device = device measurement.Device = device.(string)
var variable database.VariableType var variable database.VariableType
variable.Label = "" variable.Label = ""
variable.Variable = "" variable.Variable = ""
variable.Unit = unit variable.Unit = unit.(string)
variable.Value = value variable.Value = value
measurement.Values = make(map[string]database.VariableType) measurement.Values = make(map[string]database.VariableType)
measurement.Values["Value"] = variable measurement.Values["Value"] = variable
log.Printf("Prepared measurement item: %s", measurement) log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement) self.dbh.StoreMeasurement(&measurement)
self.S() self.S()
} }

View File

@@ -1,23 +1,22 @@
package sver package sver
import ( import (
"time" "log"
"strconv" "regexp"
"strings" "strconv"
"regexp" "strings"
"log" "time"
"udi/config" "udi/config"
"udi/handlers/handler" "udi/database"
"udi/database" "udi/handlers/handler"
) )
type SingleValueExtractorRegexHandler struct { type SingleValueExtractorRegexHandler struct {
handler.CommonHandler handler.CommonHandler
ready bool ready bool
config localConfig config localConfig
payloadRegex *regexp.Regexp payloadRegex *regexp.Regexp
dbh *database.DatabaseHandle dbh *database.DatabaseHandle
} }
const TOPIC_SEL = "topic" const TOPIC_SEL = "topic"
@@ -26,175 +25,191 @@ const PAYLOAD_FULL_SEL = "payload-full"
const CONSTANT_SEL = "constant" const CONSTANT_SEL = "constant"
type localConfig struct { type localConfig struct {
application string application string
deviceFrom string deviceFrom string
devicePart int devicePart int
device string device string
valueFrom string valueFrom string
valuePart int valuePart int
unitFrom string valueType string
unitPart int unitFrom string
unit string unitPart int
unit string
} }
func New(id string, config config.HandlerConfigT) handler.Handler { func New(id string, config config.HandlerConfigT) handler.Handler {
t := &SingleValueExtractorRegexHandler { t := &SingleValueExtractorRegexHandler{
ready: false, ready: false,
} }
var localConfig localConfig var localConfig localConfig
if config.Attributes["application"] == "" { if config.Attributes["application"] == "" {
log.Println("Error: application not configured") log.Println("Error: application not configured")
return t return t
} }
localConfig.application = config.Attributes["application"] localConfig.application = config.Attributes["application"]
payloadRegex := config.Attributes["payloadRegex"] payloadRegex := config.Attributes["payloadRegex"]
if payloadRegex != "" { if payloadRegex != "" {
t.payloadRegex = regexp.MustCompile(payloadRegex) t.payloadRegex = regexp.MustCompile(payloadRegex)
} else { } else {
t.payloadRegex = nil t.payloadRegex = nil
} }
if config.Attributes["deviceFrom"] != TOPIC_SEL && config.Attributes["deviceFrom"] != PAYLOAD_SEL && config.Attributes["deviceFrom"] != CONSTANT_SEL { if config.Attributes["deviceFrom"] != TOPIC_SEL && config.Attributes["deviceFrom"] != PAYLOAD_SEL && config.Attributes["deviceFrom"] != CONSTANT_SEL {
log.Printf("Error: invalid value %s for deviceFrom", config.Attributes["deviceFrom"]) log.Printf("Error: invalid value %s for deviceFrom", config.Attributes["deviceFrom"])
return t return t
} }
localConfig.deviceFrom = config.Attributes["deviceFrom"] localConfig.deviceFrom = config.Attributes["deviceFrom"]
devicePart, err1 := strconv.Atoi(config.Attributes["devicePart"]) devicePart, err1 := strconv.Atoi(config.Attributes["devicePart"])
if err1 != nil { if err1 != nil {
log.Printf("Error: unable to convert devicePart to number: %s", err1) log.Printf("Error: unable to convert devicePart to number: %s", err1)
return t return t
} }
localConfig.devicePart = devicePart localConfig.devicePart = devicePart
// empty device is valid // empty device is valid
localConfig.device = config.Attributes["device"] localConfig.device = config.Attributes["device"]
if config.Attributes["valueFrom"] != PAYLOAD_SEL && config.Attributes["valueFrom"] != PAYLOAD_FULL_SEL { if config.Attributes["valueFrom"] != PAYLOAD_SEL && config.Attributes["valueFrom"] != PAYLOAD_FULL_SEL {
log.Printf("Error: invalid value %s for valueFrom", config.Attributes["valueFrom"]) log.Printf("Error: invalid value %s for valueFrom", config.Attributes["valueFrom"])
return t return t
} }
localConfig.valueFrom = config.Attributes["valueFrom"] localConfig.valueFrom = config.Attributes["valueFrom"]
if config.Attributes["valueFrom"] == PAYLOAD_SEL { if config.Attributes["valueFrom"] == PAYLOAD_SEL {
valuePart, err2 := strconv.Atoi(config.Attributes["valuePart"]) valuePart, err2 := strconv.Atoi(config.Attributes["valuePart"])
if err2 != nil { if err2 != nil {
log.Printf("Error: unable to convert valuePart to number: %s", err2) log.Printf("Error: unable to convert valuePart to number: %s", err2)
return t return t
} }
localConfig.valuePart = valuePart localConfig.valuePart = valuePart
} }
if config.Attributes["unitFrom"] != PAYLOAD_SEL && config.Attributes["unitFrom"] != CONSTANT_SEL { if config.Attributes["valueType"] != "float" && config.Attributes["valueType"] != "string" {
log.Printf("Error: invalid value %s for unitFrom", config.Attributes["unitFrom"]) log.Printf("Error: invalid value %s for valueType", config.Attributes["valueType"])
return t return t
} }
localConfig.unitFrom = config.Attributes["unitFrom"] localConfig.valueType = config.Attributes["valueType"]
if config.Attributes["unitFrom"] == PAYLOAD_SEL { if config.Attributes["unitFrom"] != PAYLOAD_SEL && config.Attributes["unitFrom"] != CONSTANT_SEL {
unitPart, err3 := strconv.Atoi(config.Attributes["unitPart"]) log.Printf("Error: invalid value %s for unitFrom", config.Attributes["unitFrom"])
if err3 != nil { return t
log.Printf("Error: unable to convert unitPart to number: %s", err3) }
return t localConfig.unitFrom = config.Attributes["unitFrom"]
}
localConfig.unitPart = unitPart
}
// empty unit is valid if config.Attributes["unitFrom"] == PAYLOAD_SEL {
localConfig.unit = config.Attributes["unit"] unitPart, err3 := strconv.Atoi(config.Attributes["unitPart"])
if err3 != nil {
log.Printf("Error: unable to convert unitPart to number: %s", err3)
return t
}
localConfig.unitPart = unitPart
}
t.config = localConfig // empty unit is valid
localConfig.unit = config.Attributes["unit"]
t.Id = id t.config = localConfig
t.ready = true
t.dbh = database.NewDatabaseHandle() t.Id = id
log.Printf("Handler SVER %d initialized", id) t.ready = true
return t t.dbh = database.NewDatabaseHandle()
log.Printf("Handler SVER %d initialized", id)
return t
} }
func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) { func (self *SingleValueExtractorRegexHandler) Handle(message handler.MessageT) {
if ! self.ready { if !self.ready {
self.Lost("Handler is not marked as ready", nil, message) self.Lost("Handler is not marked as ready", nil, message)
return return
} }
//log.Printf("Handler SingleValueExtractor %d processing %s -> %s", self.id, message.Topic, message.Payload) //log.Printf("Handler SingleValueExtractor %d processing %s -> %s", self.id, message.Topic, message.Payload)
var measurement database.Measurement var measurement database.Measurement
measurement.Time = time.Now() measurement.Time = time.Now()
measurement.Application = self.config.application measurement.Application = self.config.application
subTopics := strings.Split(message.Topic, "/") subTopics := strings.Split(message.Topic, "/")
//log.Printf("Subtopics: %s", strings.Join(subTopics, ", ")) //log.Printf("Subtopics: %s", strings.Join(subTopics, ", "))
var payloadMatches []string var payloadMatches []string
if self.payloadRegex != nil { if self.payloadRegex != nil {
payloadMatches = self.payloadRegex.FindStringSubmatch(message.Payload) payloadMatches = self.payloadRegex.FindStringSubmatch(message.Payload)
//log.Printf("Matches: %s", strings.Join(payloadMatches, ", ")) //log.Printf("Matches: %s", strings.Join(payloadMatches, ", "))
} }
switch self.config.deviceFrom { switch self.config.deviceFrom {
case TOPIC_SEL: case TOPIC_SEL:
if self.config.devicePart >= len(subTopics) { if self.config.devicePart >= len(subTopics) {
self.Lost("devicePart out of range", nil, message) self.Lost("devicePart out of range", nil, message)
return return
} }
measurement.Device = subTopics[self.config.devicePart] measurement.Device = subTopics[self.config.devicePart]
case PAYLOAD_SEL: case PAYLOAD_SEL:
if self.payloadRegex == nil { if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, devicePart can't be used", nil, message) self.Lost("no payloadRegex defined, devicePart can't be used", nil, message)
return return
} }
if self.config.devicePart >= len(payloadMatches) { if self.config.devicePart >= len(payloadMatches) {
self.Lost("devicePart out of range", nil, message) self.Lost("devicePart out of range", nil, message)
return return
} }
measurement.Device = payloadMatches[self.config.devicePart] measurement.Device = payloadMatches[self.config.devicePart]
case CONSTANT_SEL: case CONSTANT_SEL:
measurement.Device = self.config.device measurement.Device = self.config.device
} }
measurement.Values = make(map[string]database.VariableType) measurement.Values = make(map[string]database.VariableType)
var variable database.VariableType var variable database.VariableType
variable.Label = "" variable.Label = ""
variable.Variable = "" variable.Variable = ""
switch self.config.valueFrom { var value string
case PAYLOAD_SEL: switch self.config.valueFrom {
if self.payloadRegex == nil { case PAYLOAD_SEL:
self.Lost("no payloadRegex defined, valuePart can't be used", nil, message) if self.payloadRegex == nil {
return self.Lost("no payloadRegex defined, valuePart can't be used", nil, message)
} return
if self.config.valuePart >= len(payloadMatches) { }
self.Lost("valuePart out of range", nil, message) if self.config.valuePart >= len(payloadMatches) {
return self.Lost("valuePart out of range", nil, message)
} return
variable.Value = payloadMatches[self.config.valuePart] }
case PAYLOAD_FULL_SEL: value = payloadMatches[self.config.valuePart]
variable.Value = message.Payload case PAYLOAD_FULL_SEL:
} value = message.Payload
}
if self.config.valueType == "float" {
fValue, err := strconv.ParseFloat(value, 64)
if err != nil {
self.Lost("Unable to convert value to float", err, message)
return
}
variable.Value = fValue
} else {
variable.Value = value
}
switch self.config.unitFrom { switch self.config.unitFrom {
case PAYLOAD_SEL: case PAYLOAD_SEL:
if self.payloadRegex == nil { if self.payloadRegex == nil {
self.Lost("no payloadRegex defined, unitPart can't be used", nil, message) self.Lost("no payloadRegex defined, unitPart can't be used", nil, message)
return return
} }
if self.config.unitPart >= len(payloadMatches) { if self.config.unitPart >= len(payloadMatches) {
self.Lost("unitPart out of range", nil, message) self.Lost("unitPart out of range", nil, message)
return return
} }
variable.Unit = payloadMatches[self.config.unitPart] variable.Unit = payloadMatches[self.config.unitPart]
case CONSTANT_SEL: case CONSTANT_SEL:
variable.Unit = self.config.unit variable.Unit = self.config.unit
} }
measurement.Values["Value"] = variable measurement.Values["Value"] = variable
//log.Printf("Prepared measurement item: %s", measurement) //log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement) self.dbh.StoreMeasurement(&measurement)
self.S() self.S()
} }

View File

@@ -1,15 +0,0 @@
package gs361ah04
type Observation struct {
LinkQuality uint8 `unit:"" json:"linkquality"`
Battery uint8 `unit:"%" json:"battery"`
AwayMode string `unit:"" json:"away_mode"`
ChildLock string `unit:"" json:"child_lock"`
CurrentHeatingSetpoint float32 `unit:"°C" json:"current_heating_setpoint"`
LocalTemperature float32 `unit:"°C" json:"local_temperature"`
Preset string `unit:"" json:"preset"`
SystemMode string `unit:"" json:"system_mode"`
ValveDetection string `unit:"" json:"valve_detection"`
WindowDetection string `unit:"" json:"window_detection"`
}

View File

@@ -1,11 +0,0 @@
package wsdcgq01lm
type Observation struct {
LinkQuality uint8 `unit:"" json:"linkquality"`
Battery uint8 `unit:"%" json:"battery"`
Humidity float32 `unit:"%H" json:"humidity"`
Pressure float32 `unit:"mbar" json:"pressure"`
Temperature float32 `unit:"°C" json:"temperature"`
Voltage uint16 `unit:"mV" json:"voltage"`
}

View File

@@ -1,10 +0,0 @@
package wsdcgq11lm
type Observation struct {
LinkQuality uint8 `unit:"" json:"linkquality"`
Battery uint8 `unit:"%" json:"battery"`
Humidity float32 `unit:"%H" json:"humidity"`
Temperature float32 `unit:"°C" json:"temperature"`
Voltage uint16 `unit:"mV" json:"voltage"`
}

View File

@@ -1,113 +1,80 @@
package z2m package z2m
import ( import (
"fmt" "encoding/json"
"log" "fmt"
"time" "log"
"strings" "strings"
"reflect" "time"
"encoding/json" "udi/config"
"udi/config" "udi/database"
"udi/handlers/handler" "udi/handlers/handler"
"udi/database"
"udi/handlers/z2m/models/wsdcgq11lm"
"udi/handlers/z2m/models/wsdcgq01lm"
"udi/handlers/z2m/models/gs361ah04"
) )
type Z2MHandler struct { type Z2MHandler struct {
handler.CommonHandler handler.CommonHandler
dbh *database.DatabaseHandle dbh *database.DatabaseHandle
}
func parse(T any, payload string, variables *map[string]database.VariableType) error {
observationType := reflect.TypeOf(T)
observation := reflect.New(observationType).Interface()
err := json.Unmarshal([]byte(payload), observation)
if err != nil {
return fmt.Errorf("Unable to parse payload into Observation struct: %v, %s", err, payload)
}
observationValue := reflect.ValueOf(observation).Elem()
for i := 0; i < observationType.NumField(); i++ {
field := observationType.Field(i)
name := field.Name
unit := field.Tag.Get("unit")
value := observationValue.Field(i).Interface()
(*variables)[name] = database.VariableType {
Label: name,
Variable: "y",
Unit: unit,
Value: value,
}
}
return nil
} }
func New(id string, config config.HandlerConfigT) handler.Handler { func New(id string, config config.HandlerConfigT) handler.Handler {
t := &Z2MHandler { t := &Z2MHandler{}
} t.Id = id
t.Id = id t.dbh = database.NewDatabaseHandle()
t.dbh = database.NewDatabaseHandle() log.Printf("Handler Z2M %d initialized", id)
log.Printf("Handler Z2M %d initialized", id) return t
return t
} }
func (self *Z2MHandler) Handle(message handler.MessageT) { func (self *Z2MHandler) Handle(message handler.MessageT) {
log.Printf("Handler Z2M %d processing %s -> %s", self.Id, message.Topic, message.Payload) log.Printf("Handler Z2M %d processing %s -> %s", self.Id, message.Topic, message.Payload)
var measurement database.Measurement var measurement database.Measurement
measurement.Time = time.Now() measurement.Time = time.Now()
subTopics := strings.Split(message.Topic, "/")
deviceId := subTopics[1]
log.Printf("DeviceId: %s", deviceId)
subTopics := strings.Split(message.Topic, "/") measurement.Device = deviceId
deviceId := subTopics[1]
log.Printf("DeviceId: %s", deviceId)
device, err1 := self.dbh.GetDeviceByLabel(deviceId)
if err1 != nil {
self.Lost("Error when loading device", err1, message)
return
}
log.Printf("Device: %s", device)
measurement.Application = device.Application.Label // Parse JSON direkt in eine map
measurement.Device = device.Attributes["Label"].(string) var jsonData map[string]interface{}
err := json.Unmarshal([]byte(message.Payload), &jsonData)
if err != nil {
self.Lost("Failed to parse JSON payload", err, message)
return
}
var T any measurement.Attributes = make(map[string]interface{})
switch device.DeviceType.ModelIdentifier { measurement.Values = make(map[string]database.VariableType)
case "WSDCGQ11LM":
T = wsdcgq11lm.Observation{}
case "WSDCGQ01LM":
T = wsdcgq01lm.Observation{}
case "GS361A-H04":
T = gs361ah04.Observation{}
default:
self.Lost(fmt.Sprintf("No parser found for %s", device.DeviceType.ModelIdentifier), nil, message)
return
}
measurement.Values = make(map[string]database.VariableType) // Extract device info for application naming
measurement.Attributes = make(map[string]interface{}) if deviceData, ok := jsonData["device"]; ok {
err3 := parse(T, if deviceMap, ok := deviceData.(map[string]any); ok {
message.Payload, manufacturerId, hasManufacturer := deviceMap["manufacturerID"]
&(measurement.Values)) model, hasModel := deviceMap["model"]
if err3 != nil {
self.Lost("Model parser failed", err3, message)
return
}
measurement.Attributes["Status"] = "ok" if !hasManufacturer || !hasModel {
measurement.Attributes["DeviceId"] = deviceId self.Lost("Missing manufacturerID or model in device data", fmt.Errorf("manufacturerID: %v, model: %v", hasManufacturer, hasModel), handler.MessageT{})
measurement.Attributes["DeviceModel"] = device.DeviceType.ModelIdentifier return
log.Printf("Prepared measurement item: %s", measurement) }
self.dbh.StoreMeasurement(&measurement)
self.S() measurement.Application = "z2m_" + fmt.Sprintf("%v", manufacturerId) + "_" + model.(string)
}
delete(jsonData, "device")
}
// Konvertiere die restlichen Elemente in VariableType-Map
for key, value := range jsonData {
measurement.Values[key] = database.VariableType{
Label: key,
Variable: "",
Unit: "",
Value: value,
}
}
measurement.Attributes["Status"] = "ok"
log.Printf("Prepared measurement item: %s", measurement)
self.dbh.StoreMeasurement(&measurement)
self.S()
} }

View File

@@ -1,18 +0,0 @@
package main
import "log"
import "udi/database"
func main() {
log.SetPrefix("UDI Migrate Schema: ")
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
log.Println("Starting")
database.Migrate()
log.Println("Done")
}