183 lines
6.6 KiB
C

#include <mqttComm.h>
#include <logger.h>
#include <main.h>
#include <pubsubc.h>
#include <platformAdaption.h>
#include <PontCoopScheduler.h>
#include <wizHelper.h>
#include <mbusComm.h>
#include <oled.h>
#include <eeprom.h>
#include <config.h>
#include <stdint.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
extern const uint8_t MQTT_SOCK;
static t_configBlock *config;
client_t client;
mqttClient_t mqttClient;
uint8_t brokerAddress[4];
uint16_t brokerPort = 1883;
static uint32_t watchdogCounter = 0;
void watchdogHandler(void *handle) {
if (watchdogCounter > 0) {
coloredMsg(LOG_GREEN, false, "Watchdog received in between");
watchdogCounter = 0;
} else {
coloredMsg(LOG_RED, true, "No watchdog received in between, booting the system ...");
// boot the system
HAL_NVIC_SystemReset();
}
}
// typedef void (*callback_t)(char*, uint8_t*, uint16_t);
static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) {
if (0 == strcmp(topic, config->watchdogTopic)) {
watchdogCounter++;
} else {
coloredMsg(LOG_GREEN, false, "mqcb: %s : %.*s", topic, payloadLength, payload);
}
}
static void mqttStatusPublisher(void *handle) {
coloredMsg(LOG_GREEN, false, "mqsp: publishing status");
t_mbusCommStats *mbusCommStats = mbusCommGetStats();
t_deviceStats* globalDeviceStats = getGlobalDeviceStats();
char buf[256];
uint32_t uptime = HAL_GetTick() / 1000;
snprintf(buf, sizeof(buf), "{\"uptime\":\"%ld\", \"runningHours\":\"%ld\", \"powerCycles\":\"%ld\", \"tasks\":\"%d\", \"requests\":\"%ld\", \"errors\":\"%ld\", \"octets\":\"%ld\", \"overrun\":\"%ld\", \"framing\":\"%ld\", \"parity\":\"%ld\", \"noise\":\"%ld\"}",
uptime, globalDeviceStats->totalRunningHours, globalDeviceStats->totalPowercycles, schTaskCnt(), mbusCommStats->mbusRequestCnt, mbusCommStats->mbusErrorCnt,
mbusCommStats->uartOctetCnt, mbusCommStats->uartOverrunCnt, mbusCommStats->uartFramingErrCnt, mbusCommStats->uartParityErrCnt, mbusCommStats->uartNoiseErrCnt);
bool res = publish(&mqttClient, config->statusTopic, (const uint8_t*)buf, strlen(buf), false);
coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res);
oledSetActiveScreen(OLED_SCREEN1);
oledClearActiveScreen();
uint8_t *addr = wizGetIPAddress();
oledPrintf(OLED_SCREEN1, "Addr:%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
oledPrintf(OLED_SCREEN1, "Network available:%d", isNetworkAvailable());
oledPrintf(OLED_SCREEN1, "Uptime:%ld", uptime);
oledPrintf(OLED_SCREEN1, "PowerCycl:%d", globalDeviceStats->totalPowercycles);
oledPrintf(OLED_SCREEN1, "Req:%ld", mbusCommStats->mbusRequestCnt);
oledPrintf(OLED_SCREEN1, "Err:%ld", mbusCommStats->mbusErrorCnt);
oledPrintf(OLED_SCREEN1, "Uart:%d", mbusCommStats->uartOverrunCnt + mbusCommStats->uartFramingErrCnt + mbusCommStats->uartParityErrCnt + mbusCommStats->uartNoiseErrCnt);
}
void mqttCommHandler(void *handle) {
static uint8_t state = 0;
static uint8_t message[] = "MeterbusGateway3Variant starting";
if (isNetworkAvailable()) {
switch (state) {
case 0:
coloredMsg(LOG_GREEN, false, "mqch, initializing mqtt client");
client.sockNum = MQTT_SOCK;
mqttClientInit(&mqttClient, &client, mqttCallback);
coloredMsg(LOG_GREEN, false, "mqch: mqtt client initialized");
state = 1;
break;
case 1:
coloredMsg(LOG_GREEN, false, "mqch, resolving broker name");
if (! wizDnsQuery(config->brokerName, brokerAddress)) {
coloredMsg(LOG_GREEN, false, "mqch, query for broker address failed, going to error state");
state = 255;
}
coloredMsg(LOG_GREEN, false, "mqch, connecting to broker ");
bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false);
coloredMsg(LOG_GREEN, false, "mqch, mqttConnect returns %d", res);
if (res) {
coloredMsg(LOG_GREEN, false, "mqch, ok, connected");
state = 2;
} else {
state = 255;
}
break;
case 2:
coloredMsg(LOG_GREEN, false, "mqch, publish start-up");
res = publish(&mqttClient, config->startupTopic, (const uint8_t*)message, strlen((char*)message), false);
coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res);
schAdd(mqttStatusPublisher, NULL, 0, 60000);
coloredMsg(LOG_GREEN, false, "mqch, status publisher scheduled");
state = 3;
break;
case 3:
coloredMsg(LOG_GREEN, false, "mqch, subscribe watchdog");
res = subscribe(&mqttClient, config->watchdogTopic, MQTTQOS0);
coloredMsg(LOG_GREEN, false, "mqch, subscribe returned %d", res);
schAdd(watchdogHandler, NULL, 60000, 60000);
coloredMsg(LOG_GREEN, false, "mqch, watchdogHandler scheduled");
state = 4;
break;
case 4:
coloredMsg(LOG_GREEN, false, "mqch, now entering the loop");
state = 5;
break;
case 5:
if (! mqttLoop(&mqttClient)) {
state = 0;
}
break;
case 255:
coloredMsg(LOG_RED, true, "mqch, error state, will stop here");
schDel(mqttCommHandler, NULL);
schDel(watchdogHandler, NULL);
coloredMsg(LOG_RED, true, "mqch, trying again in one minute");
schAdd(mqttCommHandler, NULL, 60000, 100);
break;
}
}
}
void mqttCommInit() {
config = getConfig();
schAdd(mqttCommHandler, NULL, 0, 100);
}
void mqttPublish(const char *topic, char *message) {
bool res = false;
if (isNetworkAvailable()) {
res = publish(&mqttClient, topic, (const uint8_t*)message, strlen(message), false);
}
if (res) {
coloredMsg(LOG_GREEN, false, "mqp: %s -> %s successfully published", message, topic);
} else {
coloredMsg(LOG_RED, true, "mqp: %s -> %s failed to publish", message, topic);
}
}
void mqttPublishf(const char *topic, char *messageFormat, ...) {
va_list vl;
va_start(vl, messageFormat);
char buf[2048];
int res = vsnprintf(buf, sizeof(buf), messageFormat, vl);
va_end(vl);
if (res < sizeof(buf)) {
mqttPublish(topic, buf);
} else {
coloredMsg(LOG_RED, true, "mqc mpf buffer overflow, truncated message not published");
}
}