from loguru import logger import paho.mqtt.client as mqtt import os import sys import ssl import json import base64 import struct import psycopg2 import psycopg2.extras class VariableNotFoundException (Exception): def __init__(self, appId, converterId, deviceId, variableId): self.appId = appId self.converterId = converterId self.deviceId = deviceId self.variableId = variableId class InvalidTopicException (Exception): def __init__(self, topic): self.topic = topic class JustIgnoreMessage (Exception): def __init__(self, application, device, message): self.application = application self.device = device self.message = message class DbOp(object): def __init__(self, config): self.conn = None def __getConn(self): conn = psycopg2.connect() conn.autocommit = False return conn def getVariable(self, appId, converterId, deviceId, variableId): try: conn = self.__getConn() with conn: with conn.cursor() as cur: cur.execute(""" select a.label as application, v.label as variable, v.unit as unit from application_t a, variable_t v where a.app_id = %(appId)s and v.app = a.id and v.converter_id = %(converterId)s and v.device_id = %(deviceId)s and v.variable_id = %(variableId)s """, { 'appId': appId, 'converterId': converterId, 'deviceId': deviceId, 'variableId': variableId }) res = cur.fetchone() if res is None: raise VariableNotFoundException(appId, converterId, deviceId, variableId) variable = { 'application': res[0], 'variable': res[1], 'unit': res[2] } logger.debug(f"{variable=}") return application except Exception as e: logger.error(f"Error getting variable: {e}") raise VariableNotFoundException(appId, converterId, deviceId, variableId) finally: if conn: conn.close() def storeMeasurement(self, measurement): try: logger.info(f"About to store {measurement}") conn = self.__getConn() with conn: with conn.cursor() as cur: cur.execute(""" insert into measurement_t (time, application, variable, value, unit) values (now(), %(application)s, %(variable)s, %(value)s, %(unit)s """, { 'application': measurement['application'], 'variable': measurement['variable'], 'value': measurement['value'], 'unit': measurement['unit'] } ) except Exception as e: logger.error(f"Error storing measurement: {e}") finally: if conn: conn.close() def splitTopic(topic): try: (_, appId, converterId, rest) = topic.split('/', 3) except ValueError: raise InvalidTopicException(topic) r = rest.split('/') if (len(r) == 1: deviceId = 'mains', variableId = r[0] elif (len(r) == 2: (deviceId, variableid) = r else: raise InvalidTopicException(topic) return (appId, converterId, deviceId, variableid) def mqttOnConnectCallback(client, userdata, flags, rc): logger.info("mqtt connected") mqtt_in_topic = f"cem/#" client.subscribe(mqtt_in_topic) logger.info(f"subscribed to {mqtt_in_topic}") def mqttOnMessageCallback(client, userdata, message): try: topic = message.topic payload = message.payload logger.debug(f"mqtt message received: {topic} -> {payload}") (appId, converterId, deviceId, variableId) = splitTopic(topic) dbh = DbOp(config) variable = dbh.getVariable(appId, converterId, deviceId, variableid) measurement = { "application": variable["application"], "variable": variable["variable"], "unit": variable["unit"], "value": payload } logger.debug(f"{measurement=}") dbh.storeMeasurement(measurement) except JustIgnoreMessage as e: logger.info(f"JustIgnoreMessage: {e.application}, {e.device}, {e.message}") except VariableNotFoundException as e: logger.error(f"message from unknown topic {e.appId, e.converterId, e.deviceId, e.variableId}") except InvalidTopicException as e: logger.error(f"invalid topic {e.topic}") except Exception as e: logger.error(f"unable to parse message {payload}, {e}") def mqttOnDisconnectCallback(client, userdata, rc): pass logger.info("preprocess starting") REQUIRED_CONFIG_OPTIONS = [ 'MQTT_LOGIN', 'MQTT_PASSWORD', 'MQTT_BROKER', 'MQTT_PORT', 'MQTT_CA' ] config = {} for rco in REQUIRED_CONFIG_OPTIONS: try: config[rco] = os.environ[rco] except KeyError: logger.error(f"{rco} is a required config option, not available in environment") sys.exit(-1) client = mqtt.Client() client.on_message = mqttOnMessageCallback client.on_connect = mqttOnConnectCallback client.on_disconnect = mqttOnDisconnectCallback client.username_pw_set(config['MQTT_LOGIN'], config['MQTT_PASSWORD']) client.tls_set( cert_reqs=ssl.CERT_REQUIRED, ciphers=None ) client.connect(config["MQTT_BROKER"], int(config["MQTT_PORT"])) #client.connect('172.16.2.16', 1883) logger.info("mqtt loop starting") client.loop_forever()