8 Commits
0.0.1 ... 0.1.1

3 changed files with 27 additions and 57 deletions

View File

@ -30,24 +30,26 @@ deploy:
- tags - tags
variables: variables:
GIT_STRATEGY: none GIT_STRATEGY: none
CONTAINER_NAME: preprocessor CONTAINER_NAME: badesee-preprocessor
script: script:
- docker stop $CONTAINER_NAME || echo "container not running, never mind" - docker stop $CONTAINER_NAME || echo "container not running, never mind"
- docker rm $CONTAINER_NAME || echo "container not existing, never mind" - docker rm $CONTAINER_NAME || echo "container not existing, never mind"
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker run - docker run
-d -d
--name $CONTAINER_NAME --name $CONTAINER_NAME
--restart always --restart always
--link timescaledb-server --network internal-network
-e "APPLICATION_TENANT=$APPLICATION_TENANT" -e "APPLICATION_TENANT=$APPLICATION_TENANT"
-e "MQTT_LOGIN=$MQTT_LOGIN" -e "MQTT_LOGIN=$MQTT_LOGIN"
-e "MQTT_PASSWORD=$MQTT_PASSWORD" -e "MQTT_PASSWORD=$MQTT_PASSWORD"
-e "MQTT_BROKER=$MQTT_BROKER" -e "MQTT_BROKER=$MQTT_BROKER"
-e "PGHOST=timescaledb-server" -e "PGHOST=timescaledb"
-e "PGUSER=$PGUSER" -e "PGUSER=$PGUSER"
-e "PGPASSWORD=$PGPASSWORD" -e "PGPASSWORD=$PGPASSWORD"
-e "PGDATABASE=$PGDATABASE" -e "PGDATABASE=$PGDATABASE"
$IMAGE_NAME:$CI_COMMIT_TAG $IMAGE_NAME:$CI_COMMIT_TAG
- docker network connect external-network $CONTAINER_NAME
environment: environment:
name: production name: production

View File

@ -1,3 +1,6 @@
create database monitoring;
create extension timescaledb;
CREATE TABLE device_t ( CREATE TABLE device_t (
id SERIAL NOT NULL PRIMARY KEY, id SERIAL NOT NULL PRIMARY KEY,
device_id VARCHAR(32) NOT NULL UNIQUE, device_id VARCHAR(32) NOT NULL UNIQUE,
@ -30,10 +33,6 @@ insert into sensor_t (address, label, index, device)
(10664523975231507496, '3,0m ', 2, 1), (10664523975231507496, '3,0m ', 2, 1),
(15276209993662477608, '4,0m ', 3, 1); (15276209993662477608, '4,0m ', 3, 1);
select d.label as device_label, create user preprocessor password 'geheim';
s.label as label, grant select on device_t, sensor_t to preprocessor;
s.address as address, grant insert on measurement_t to preprocessor;
s.index as index
from device_t d, sensor_t s
where d.id = s.device;

View File

@ -70,8 +70,8 @@ class DbOp(object):
with conn: with conn:
with conn.cursor() as cur: with conn.cursor() as cur:
for entry in measurement['measurements']: for entry in measurement['measurements']:
cur.execute("insert into measurement_t (time, device_name, sensor_name, temperature) values (%(time)s, %(dname)s, %(sname)s, %(tempc)s)", cur.execute("insert into measurement_t (time, device_name, sensor_name, temperature) values (now(), %(dname)s, %(sname)s, %(tempc)s)",
{ 'time': measurement['time'], 'dname': measurement['label'], 'sname': entry['label'], 'tempc': entry['value'] }) { 'dname': measurement['label'], 'sname': entry['label'], 'tempc': entry['value'] })
except Exception as e: except Exception as e:
logger.error(f"Error storing measurement: {e}") logger.error(f"Error storing measurement: {e}")
finally: finally:
@ -91,7 +91,6 @@ def mqttOnMessageCallback(client, userdata, message):
logger.debug(f"mqtt message received: {topic} -> {payload}") logger.debug(f"mqtt message received: {topic} -> {payload}")
parse_payload = json.loads(payload) parse_payload = json.loads(payload)
sendSetupMessage = False
device_id = parse_payload['end_device_ids']['device_id'] device_id = parse_payload['end_device_ids']['device_id']
dbh = DbOp(config) dbh = DbOp(config)
@ -107,27 +106,18 @@ def mqttOnMessageCallback(client, userdata, message):
frame = base64.b64decode(parse_payload['uplink_message']['frm_payload']) frame = base64.b64decode(parse_payload['uplink_message']['frm_payload'])
status = struct.unpack('<H', frame[0:2])[0] status = struct.unpack('<H', frame[0:2])[0]
logger.debug(f"{frame=}, {status=}") u_bat = struct.unpack('<H', frame[2:4])[0]
logger.debug(f"{frame=}, {status=}, {u_bat=}")
if (status == 4): logger.info(f"Regular message received from {device_id}, {device['label']}")
logger.info(f"Start up message received from {device_id}, {device['label']}") measurement = {
for i in range(0, num_of_sensors): 'label': device['label'],
start_index = 2 + (i * 8) 'time': parse_payload['received_at'],
end_index = start_index + 8 'measurements': []
received_sensor_address = struct.unpack('<Q', frame[start_index:end_index])[0] }
logger.debug(f"sensor {i}: 0x{received_sensor_address:016x}") for i in range(0, num_of_sensors):
if (received_sensor_address not in sensors): try:
raise UnknownSensorException(received_sensor_address) addr_start_index = 4 + (i * (8 + 4))
sendSetupMessage = True
else:
logger.info(f"Regular message received from {device_id}, {device['label']}")
measurement = {
'label': device['label'],
'time': parse_payload['received_at'],
'measurements': []
}
for i in range(0, num_of_sensors):
addr_start_index = 2 + (i * (8 + 4))
addr_end_index = addr_start_index + 8 addr_end_index = addr_start_index + 8
received_sensor_address = struct.unpack('<Q', frame[addr_start_index:addr_end_index])[0] received_sensor_address = struct.unpack('<Q', frame[addr_start_index:addr_end_index])[0]
if (received_sensor_address not in sensors): if (received_sensor_address not in sensors):
@ -141,31 +131,10 @@ def mqttOnMessageCallback(client, userdata, message):
"value": value, "value": value,
"label": sensors[received_sensor_address][1] "label": sensors[received_sensor_address][1]
}) })
dbh.storeMeasurement(measurement) except UnknownSensorException as e:
logger.error(f"unknown sensor in message {e.sensorAddress}")
dbh.storeMeasurement(measurement)
if (sendSetupMessage):
sendSetupMessage = False
setupMessage = bytes()
null = 0
null = null.to_bytes(1, byteorder='big')
for sk, sv in sensors.items():
logger.debug(f"{sk=}, {sv=}")
setupMessage += struct.pack('<Q', sk) + sv[0].to_bytes(1, byteorder='big') + bytes(sv[1], 'ASCII') + null
setupMessage = base64.b64encode(setupMessage).decode('ASCII')
logger.debug(f"about to send setup message {setupMessage}")
setupTopic = f"v3/{config['APPLICATION_TENANT']}/devices/{device_id}/down/push"
client.publish(setupTopic, json.dumps({
"downlinks": [
{
"f_port": 1,
"frm_payload": setupMessage,
"priority": "NORMAL"
}
]
}))
except UnknownSensorException as e:
logger.error(f"unknown sensor in message {e.sensorAddress}")
except DeviceNotFoundException as e: except DeviceNotFoundException as e:
logger.error(f"message from unknown device {e.deviceId}") logger.error(f"message from unknown device {e.deviceId}")
except Exception as e: except Exception as e: