Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
6060d3ed2f
|
|||
36d18dcb79
|
|||
3c932220e7
|
|||
c2516fe0b7
|
|||
ac4a032b62
|
|||
69e8a507aa
|
|||
dea278bf03
|
|||
cc7af48f93
|
|||
d2a07a6924
|
@ -30,7 +30,7 @@ deploy:
|
|||||||
- tags
|
- tags
|
||||||
variables:
|
variables:
|
||||||
GIT_STRATEGY: none
|
GIT_STRATEGY: none
|
||||||
CONTAINER_NAME: preprocessor
|
CONTAINER_NAME: badesee-preprocessor
|
||||||
script:
|
script:
|
||||||
- docker stop $CONTAINER_NAME || echo "container not running, never mind"
|
- docker stop $CONTAINER_NAME || echo "container not running, never mind"
|
||||||
- docker rm $CONTAINER_NAME || echo "container not existing, never mind"
|
- docker rm $CONTAINER_NAME || echo "container not existing, never mind"
|
||||||
@ -39,16 +39,17 @@ deploy:
|
|||||||
-d
|
-d
|
||||||
--name $CONTAINER_NAME
|
--name $CONTAINER_NAME
|
||||||
--restart always
|
--restart always
|
||||||
--link timescaledb-server
|
--network internal-network
|
||||||
-e "APPLICATION_TENANT=$APPLICATION_TENANT"
|
-e "APPLICATION_TENANT=$APPLICATION_TENANT"
|
||||||
-e "MQTT_LOGIN=$MQTT_LOGIN"
|
-e "MQTT_LOGIN=$MQTT_LOGIN"
|
||||||
-e "MQTT_PASSWORD=$MQTT_PASSWORD"
|
-e "MQTT_PASSWORD=$MQTT_PASSWORD"
|
||||||
-e "MQTT_BROKER=$MQTT_BROKER"
|
-e "MQTT_BROKER=$MQTT_BROKER"
|
||||||
-e "PGHOST=timescaledb-server"
|
-e "PGHOST=timescaledb"
|
||||||
-e "PGUSER=$PGUSER"
|
-e "PGUSER=$PGUSER"
|
||||||
-e "PGPASSWORD=$PGPASSWORD"
|
-e "PGPASSWORD=$PGPASSWORD"
|
||||||
-e "PGDATABASE=$PGDATABASE"
|
-e "PGDATABASE=$PGDATABASE"
|
||||||
$IMAGE_NAME:$CI_COMMIT_TAG
|
$IMAGE_NAME:$CI_COMMIT_TAG
|
||||||
|
- docker network connect external-network $CONTAINER_NAME
|
||||||
environment:
|
environment:
|
||||||
name: production
|
name: production
|
||||||
|
|
||||||
|
@ -1,3 +1,6 @@
|
|||||||
|
create database monitoring;
|
||||||
|
create extension timescaledb;
|
||||||
|
|
||||||
CREATE TABLE device_t (
|
CREATE TABLE device_t (
|
||||||
id SERIAL NOT NULL PRIMARY KEY,
|
id SERIAL NOT NULL PRIMARY KEY,
|
||||||
device_id VARCHAR(32) NOT NULL UNIQUE,
|
device_id VARCHAR(32) NOT NULL UNIQUE,
|
||||||
@ -23,6 +26,15 @@ CREATE TABLE measurement_t (
|
|||||||
|
|
||||||
SELECT create_hypertable('measurement_t', 'time');
|
SELECT create_hypertable('measurement_t', 'time');
|
||||||
|
|
||||||
|
CREATE TABLE voltage_t (
|
||||||
|
time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
|
||||||
|
device_name VARCHAR(16) NOT NULL,
|
||||||
|
voltage INTEGER
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_hypertable('voltage_t', 'time');
|
||||||
|
|
||||||
|
|
||||||
insert into device_t (device_id, label) values('eui-43fa12f400006c88', 'badesee');
|
insert into device_t (device_id, label) values('eui-43fa12f400006c88', 'badesee');
|
||||||
insert into sensor_t (address, label, index, device)
|
insert into sensor_t (address, label, index, device)
|
||||||
values (13258914387362694952, '0,5m ', 0, 1),
|
values (13258914387362694952, '0,5m ', 0, 1),
|
||||||
@ -30,10 +42,7 @@ insert into sensor_t (address, label, index, device)
|
|||||||
(10664523975231507496, '3,0m ', 2, 1),
|
(10664523975231507496, '3,0m ', 2, 1),
|
||||||
(15276209993662477608, '4,0m ', 3, 1);
|
(15276209993662477608, '4,0m ', 3, 1);
|
||||||
|
|
||||||
select d.label as device_label,
|
create user preprocessor password 'geheim';
|
||||||
s.label as label,
|
grant select on device_t, sensor_t to preprocessor;
|
||||||
s.address as address,
|
grant insert on measurement_t to preprocessor;
|
||||||
s.index as index
|
grant insert on voltage_t to preprocessor;
|
||||||
from device_t d, sensor_t s
|
|
||||||
where d.id = s.device;
|
|
||||||
|
|
||||||
|
@ -69,9 +69,11 @@ class DbOp(object):
|
|||||||
conn = self.__getConn()
|
conn = self.__getConn()
|
||||||
with conn:
|
with conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("insert into voltage_t (time, device_name, voltage) values (now(), %(dname)s, %(voltage)s)",
|
||||||
|
{ 'dname': measurement['label'], 'voltage': measurement['u_bat']})
|
||||||
for entry in measurement['measurements']:
|
for entry in measurement['measurements']:
|
||||||
cur.execute("insert into measurement_t (time, device_name, sensor_name, temperature) values (%(time)s, %(dname)s, %(sname)s, %(tempc)s)",
|
cur.execute("insert into measurement_t (time, device_name, sensor_name, temperature) values (now(), %(dname)s, %(sname)s, %(tempc)s)",
|
||||||
{ 'time': measurement['time'], 'dname': measurement['label'], 'sname': entry['label'], 'tempc': entry['value'] })
|
{ 'dname': measurement['label'], 'sname': entry['label'], 'tempc': entry['value'] })
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error storing measurement: {e}")
|
logger.error(f"Error storing measurement: {e}")
|
||||||
finally:
|
finally:
|
||||||
@ -91,7 +93,6 @@ def mqttOnMessageCallback(client, userdata, message):
|
|||||||
logger.debug(f"mqtt message received: {topic} -> {payload}")
|
logger.debug(f"mqtt message received: {topic} -> {payload}")
|
||||||
parse_payload = json.loads(payload)
|
parse_payload = json.loads(payload)
|
||||||
|
|
||||||
sendSetupMessage = False
|
|
||||||
device_id = parse_payload['end_device_ids']['device_id']
|
device_id = parse_payload['end_device_ids']['device_id']
|
||||||
|
|
||||||
dbh = DbOp(config)
|
dbh = DbOp(config)
|
||||||
@ -104,30 +105,23 @@ def mqttOnMessageCallback(client, userdata, message):
|
|||||||
sendSetupMessage = True
|
sendSetupMessage = True
|
||||||
num_of_sensors = len(device['sensors'])
|
num_of_sensors = len(device['sensors'])
|
||||||
sensors = { x['address']:[x['index'], x['label']] for x in device['sensors']}
|
sensors = { x['address']:[x['index'], x['label']] for x in device['sensors']}
|
||||||
|
logger.debug(f"{sensors=}, {num_of_sensors=}")
|
||||||
|
|
||||||
frame = base64.b64decode(parse_payload['uplink_message']['frm_payload'])
|
frame = base64.b64decode(parse_payload['uplink_message']['frm_payload'])
|
||||||
status = struct.unpack('<H', frame[0:2])[0]
|
status = struct.unpack('<H', frame[0:2])[0]
|
||||||
logger.debug(f"{frame=}, {status=}")
|
u_bat = struct.unpack('<H', frame[2:4])[0]
|
||||||
|
logger.debug(f"{frame=}, {status=}, {u_bat=}")
|
||||||
|
|
||||||
if (status == 4):
|
|
||||||
logger.info(f"Start up message received from {device_id}, {device['label']}")
|
|
||||||
for i in range(0, num_of_sensors):
|
|
||||||
start_index = 2 + (i * 8)
|
|
||||||
end_index = start_index + 8
|
|
||||||
received_sensor_address = struct.unpack('<Q', frame[start_index:end_index])[0]
|
|
||||||
logger.debug(f"sensor {i}: 0x{received_sensor_address:016x}")
|
|
||||||
if (received_sensor_address not in sensors):
|
|
||||||
raise UnknownSensorException(received_sensor_address)
|
|
||||||
sendSetupMessage = True
|
|
||||||
else:
|
|
||||||
logger.info(f"Regular message received from {device_id}, {device['label']}")
|
logger.info(f"Regular message received from {device_id}, {device['label']}")
|
||||||
measurement = {
|
measurement = {
|
||||||
'label': device['label'],
|
'label': device['label'],
|
||||||
|
'u_bat': u_bat,
|
||||||
'time': parse_payload['received_at'],
|
'time': parse_payload['received_at'],
|
||||||
'measurements': []
|
'measurements': []
|
||||||
}
|
}
|
||||||
for i in range(0, num_of_sensors):
|
for i in range(0, num_of_sensors):
|
||||||
addr_start_index = 2 + (i * (8 + 4))
|
try:
|
||||||
|
addr_start_index = 4 + (i * (8 + 4))
|
||||||
addr_end_index = addr_start_index + 8
|
addr_end_index = addr_start_index + 8
|
||||||
received_sensor_address = struct.unpack('<Q', frame[addr_start_index:addr_end_index])[0]
|
received_sensor_address = struct.unpack('<Q', frame[addr_start_index:addr_end_index])[0]
|
||||||
if (received_sensor_address not in sensors):
|
if (received_sensor_address not in sensors):
|
||||||
@ -141,31 +135,10 @@ def mqttOnMessageCallback(client, userdata, message):
|
|||||||
"value": value,
|
"value": value,
|
||||||
"label": sensors[received_sensor_address][1]
|
"label": sensors[received_sensor_address][1]
|
||||||
})
|
})
|
||||||
dbh.storeMeasurement(measurement)
|
|
||||||
|
|
||||||
if (sendSetupMessage):
|
|
||||||
sendSetupMessage = False
|
|
||||||
setupMessage = bytes()
|
|
||||||
null = 0
|
|
||||||
null = null.to_bytes(1, byteorder='big')
|
|
||||||
for sk, sv in sensors.items():
|
|
||||||
logger.debug(f"{sk=}, {sv=}")
|
|
||||||
setupMessage += struct.pack('<Q', sk) + sv[0].to_bytes(1, byteorder='big') + bytes(sv[1], 'ASCII') + null
|
|
||||||
setupMessage = base64.b64encode(setupMessage).decode('ASCII')
|
|
||||||
logger.debug(f"about to send setup message {setupMessage}")
|
|
||||||
setupTopic = f"v3/{config['APPLICATION_TENANT']}/devices/{device_id}/down/push"
|
|
||||||
client.publish(setupTopic, json.dumps({
|
|
||||||
"downlinks": [
|
|
||||||
{
|
|
||||||
"f_port": 1,
|
|
||||||
"frm_payload": setupMessage,
|
|
||||||
"priority": "NORMAL"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}))
|
|
||||||
|
|
||||||
except UnknownSensorException as e:
|
except UnknownSensorException as e:
|
||||||
logger.error(f"unknown sensor in message {e.sensorAddress}")
|
logger.error(f"unknown sensor in message {e.sensorAddress}")
|
||||||
|
dbh.storeMeasurement(measurement)
|
||||||
|
|
||||||
except DeviceNotFoundException as e:
|
except DeviceNotFoundException as e:
|
||||||
logger.error(f"message from unknown device {e.deviceId}")
|
logger.error(f"message from unknown device {e.deviceId}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
Reference in New Issue
Block a user