This commit is contained in:
Wolfgang Hottgenroth 2023-01-27 23:56:21 +01:00
commit ad21e10686
Signed by: wn
GPG Key ID: 836E9E1192A6B132
4 changed files with 104 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
ENV
*.pyc
src/.venv/*

4
docs/notes.txt Normal file
View File

@ -0,0 +1,4 @@
Sending messages downlink via MQTT: https://www.thethingsindustries.com/docs/integrations/mqtt/#publishing-downlink-traffic
Topics for messages at TTN: https://www.thethingsindustries.com/docs/integrations/mqtt/#publishing-downlink-traffic

94
src/preprocess.py Normal file
View File

@ -0,0 +1,94 @@
from loguru import logger
import paho.mqtt.client as mqtt
import os
import sys
import ssl
import json
import base64
import struct
def mqttOnConnectCallback(client, userdata, flags, rc):
logger.info("mqtt connected")
client.subscribe(config['MQTT_IN_TOPIC'])
logger.info(f"subscribed to {config['MQTT_IN_TOPIC']}")
def mqttOnMessageCallback(client, userdata, message):
try:
topic = message.topic
payload = message.payload
logger.debug(f"mqtt message received: {topic} -> {payload}")
parse_payload = json.loads(payload)
frame = base64.b64decode(parse_payload['uplink_message']['frm_payload'])
logger.info(f"{frame=}")
status = struct.unpack('<H', frame[0:2])[0]
logger.info(f"{status=}")
if (status == 4):
logger.info("Start up message received")
# send downlink message with labels
sensor_addrs = []
for i in range(0, 4):
start_index = 2 + (i * 8)
end_index = start_index + 8
sensor_addrs.append(struct.unpack('<Q', frame[start_index:end_index])[0])
logger.debug(f"sensor {i}: 0x{sensor_addrs[i]:016x}")
else:
logger.info("Regular message received")
# continue to parse and process message
sensor_addrs = []
sensor_values = []
for i in range(0, 4):
addr_start_index = 2 + (i * (8 + 4))
addr_end_index = addr_start_index + 8
sensor_addrs.append(struct.unpack('<Q', frame[addr_start_index:addr_end_index])[0])
value_start_index = addr_end_index
value_end_index = value_start_index + 4
sensor_values.append(struct.unpack('<i', frame[value_start_index:value_end_index])[0] / 128)
logger.debug(f"sensor {i}: 0x{sensor_addrs[i]:016x} = {sensor_values[i]:.2f} °C")
except Exception as e:
logger.error(f"unable to parse message {payload}, {e}")
def mqttOnDisconnectCallback(client, userdata, rc):
pass
logger.info("preprocess starting")
REQUIRED_CONFIG_OPTIONS = [
'MQTT_LOGIN',
'MQTT_PASSWORD',
'MQTT_BROKER',
'MQTT_PORT',
'MQTT_CA',
'MQTT_IN_TOPIC'
]
config = {}
for rco in REQUIRED_CONFIG_OPTIONS:
try:
config[rco] = os.environ[rco]
except KeyError:
logger.error(f"{rco} is a required config option, not available in environment")
sys.exit(-1)
client = mqtt.Client()
client.on_message = mqttOnMessageCallback
client.on_connect = mqttOnConnectCallback
client.on_disconnect = mqttOnDisconnectCallback
client.username_pw_set(config['MQTT_LOGIN'], config['MQTT_PASSWORD'])
client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
ciphers=None
)
client.connect(config["MQTT_BROKER"], int(config["MQTT_PORT"]))
logger.info("mqtt loop starting")
client.loop_forever()

2
src/requirements.txt Normal file
View File

@ -0,0 +1,2 @@
loguru==0.6.0
paho-mqtt==1.6.1