ci added
This commit is contained in:
@ -10,13 +10,8 @@ import psycopg2
|
||||
import psycopg2.extras
|
||||
|
||||
|
||||
oldDevice = {}
|
||||
|
||||
class DeviceNotFoundException (Exception):
|
||||
def __init__(self, deviceId):
|
||||
self.deviceId = deviceId
|
||||
|
||||
class UnknownSensorException (Exception):
|
||||
class ApplicationNotFoundException (Exception):
|
||||
def __init__(self, deviceId):
|
||||
self.deviceId = deviceId
|
||||
|
||||
@ -29,35 +24,25 @@ class DbOp(object):
|
||||
conn.autocommit = False
|
||||
return conn
|
||||
|
||||
def getDevice(self, deviceId):
|
||||
def getApplication(self, deviceId):
|
||||
try:
|
||||
conn = self.__getConn()
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select id, label from device_t where device_id = %(deviceId)s", { 'deviceId': deviceId })
|
||||
cur.execute("select label, sensor_type, ground_level from application_t where device_id = %(deviceId)s", { 'deviceId': deviceId })
|
||||
res = cur.fetchone()
|
||||
if res is None:
|
||||
raise DeviceNotFoundException(deviceId)
|
||||
device_label = res[1]
|
||||
id = res[0]
|
||||
logger.debug(f"{device_label=}")
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select sensor_type, ground_level from level_sensor_t where device = %(id)s", { 'id': id })
|
||||
res = cur.fetchone()
|
||||
if res is None:
|
||||
raise UnknownSensorException(deviceId)
|
||||
sensor_type = res[0]
|
||||
ground_level = res[1]
|
||||
logger.debug(f"{sensor_type=}, {ground_level=}")
|
||||
device = {
|
||||
'label': device_label,
|
||||
'sensor_type': sensor_type,
|
||||
'ground_level': ground_level
|
||||
}
|
||||
return device
|
||||
raise ApplicationNotFoundException(deviceId)
|
||||
application = {
|
||||
'label': res[0],
|
||||
'sensor_type': res[1],
|
||||
'ground_level': res[2]
|
||||
}
|
||||
logger.debug(f"{application=}")
|
||||
return application
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting device: {e}")
|
||||
raise DeviceNotFoundException(deviceId)
|
||||
raise ApplicationNotFoundException(deviceId)
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
@ -68,9 +53,18 @@ class DbOp(object):
|
||||
conn = self.__getConn()
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
for entry in measurement['measurements']:
|
||||
cur.execute("insert into measurement_t (time, device_name, sensor_name, temperature) values (now(), %(dname)s, %(sname)s, %(tempc)s)",
|
||||
{ 'dname': measurement['label'], 'sname': entry['label'], 'tempc': entry['value'] })
|
||||
cur.execute("""
|
||||
insert into measurement_t (time, application_name, raw_level, level, status, battery)
|
||||
values (now(), %(application_name)s, %(raw_level)s, %(level)s, %(status)s, %(battery)s)
|
||||
""",
|
||||
{
|
||||
'application_name': measurement['application_name'],
|
||||
'raw_level': measurement['raw_level'],
|
||||
'level': measurement['level'],
|
||||
'status': measurement['status'],
|
||||
'battery': measurement['battery']
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error storing measurement: {e}")
|
||||
finally:
|
||||
@ -94,78 +88,27 @@ def mqttOnMessageCallback(client, userdata, message):
|
||||
device_id = parse_payload['end_device_ids']['device_id']
|
||||
|
||||
dbh = DbOp(config)
|
||||
device = dbh.getDevice(device_id)
|
||||
|
||||
global oldDevice
|
||||
if (device != oldDevice):
|
||||
logger.info("device configuration in database has changed")
|
||||
oldDevice = device
|
||||
sendSetupMessage = True
|
||||
num_of_sensors = len(device['sensors'])
|
||||
sensors = { x['address']:[x['index'], x['label']] for x in device['sensors']}
|
||||
application = dbh.getApplication(device_id)
|
||||
|
||||
frame = base64.b64decode(parse_payload['uplink_message']['frm_payload'])
|
||||
status = struct.unpack('<H', frame[0:2])[0]
|
||||
logger.debug(f"{frame=}, {status=}")
|
||||
|
||||
if (status == 4):
|
||||
logger.info(f"Start up message received from {device_id}, {device['label']}")
|
||||
for i in range(0, num_of_sensors):
|
||||
start_index = 2 + (i * 8)
|
||||
end_index = start_index + 8
|
||||
received_sensor_address = struct.unpack('<Q', frame[start_index:end_index])[0]
|
||||
logger.debug(f"sensor {i}: 0x{received_sensor_address:016x}")
|
||||
if (received_sensor_address not in sensors):
|
||||
raise UnknownSensorException(received_sensor_address)
|
||||
sendSetupMessage = True
|
||||
else:
|
||||
logger.info(f"Regular message received from {device_id}, {device['label']}")
|
||||
measurement = {
|
||||
'label': device['label'],
|
||||
'time': parse_payload['received_at'],
|
||||
'measurements': []
|
||||
}
|
||||
for i in range(0, num_of_sensors):
|
||||
addr_start_index = 2 + (i * (8 + 4))
|
||||
addr_end_index = addr_start_index + 8
|
||||
received_sensor_address = struct.unpack('<Q', frame[addr_start_index:addr_end_index])[0]
|
||||
if (received_sensor_address not in sensors):
|
||||
raise UnknownSensorException(received_sensor_address)
|
||||
value_start_index = addr_end_index
|
||||
value_end_index = value_start_index + 4
|
||||
value = struct.unpack('<i', frame[value_start_index:value_end_index])[0] / 128
|
||||
logger.debug(f"sensor {i}: 0x{received_sensor_address:016x} = {value:.2f} °C")
|
||||
measurement['measurements'].append({
|
||||
"address": received_sensor_address,
|
||||
"value": value,
|
||||
"label": sensors[received_sensor_address][1]
|
||||
})
|
||||
dbh.storeMeasurement(measurement)
|
||||
battery = struct.unpack('>H', frame[0:2])[0]
|
||||
distance = struct.unpack('>H', frame[2:4])[0]
|
||||
status = struct.unpack('?', frame[7:8])[0]
|
||||
logger.debug(f"{frame=}, {battery=}, {distance=}, {status=}")
|
||||
|
||||
if (sendSetupMessage):
|
||||
sendSetupMessage = False
|
||||
setupMessage = bytes()
|
||||
null = 0
|
||||
null = null.to_bytes(1, byteorder='big')
|
||||
for sk, sv in sensors.items():
|
||||
logger.debug(f"{sk=}, {sv=}")
|
||||
setupMessage += struct.pack('<Q', sk) + sv[0].to_bytes(1, byteorder='big') + bytes(sv[1], 'ASCII') + null
|
||||
setupMessage = base64.b64encode(setupMessage).decode('ASCII')
|
||||
logger.debug(f"about to send setup message {setupMessage}")
|
||||
setupTopic = f"v3/{config['APPLICATION_TENANT']}/devices/{device_id}/down/push"
|
||||
client.publish(setupTopic, json.dumps({
|
||||
"downlinks": [
|
||||
{
|
||||
"f_port": 1,
|
||||
"frm_payload": setupMessage,
|
||||
"priority": "NORMAL"
|
||||
}
|
||||
]
|
||||
}))
|
||||
measurement = {
|
||||
'application_name': application['label'],
|
||||
'raw_level': distance,
|
||||
'level': application['ground_level'] - distance,
|
||||
'status': 'Ok' if status else 'No sensor',
|
||||
'battery': battery / 100
|
||||
}
|
||||
logger.debug(f"{measurement=}")
|
||||
|
||||
except UnknownSensorException as e:
|
||||
logger.error(f"unknown sensor in message {e.sensorAddress}")
|
||||
except DeviceNotFoundException as e:
|
||||
dbh.storeMeasurement(measurement)
|
||||
|
||||
except ApplicationNotFoundException as e:
|
||||
logger.error(f"message from unknown device {e.deviceId}")
|
||||
except Exception as e:
|
||||
logger.error(f"unable to parse message {payload}, {e}")
|
||||
@ -206,6 +149,7 @@ client.tls_set(
|
||||
ciphers=None
|
||||
)
|
||||
client.connect(config["MQTT_BROKER"], int(config["MQTT_PORT"]))
|
||||
#client.connect('172.16.2.16', 1883)
|
||||
logger.info("mqtt loop starting")
|
||||
client.loop_forever()
|
||||
|
||||
|
Reference in New Issue
Block a user