159 lines
5.0 KiB
C
Raw Normal View History

2020-11-10 15:56:07 +01:00
#include <mqttComm.h>
#include <logger.h>
2020-11-14 21:15:33 +01:00
#include <main.h>
#include <pubsubc.h>
#include <platformAdaption.h>
2020-11-10 15:56:07 +01:00
#include <PontCoopScheduler.h>
#include <wizHelper.h>
2020-11-14 21:15:33 +01:00
#include <mbusComm.h>
2020-11-10 15:56:07 +01:00
#include <stdint.h>
2020-11-14 21:15:33 +01:00
#include <stdlib.h>
#include <stdbool.h>
2020-11-10 16:06:23 +01:00
#include <string.h>
2020-11-14 21:15:33 +01:00
#include <stdio.h>
2020-11-18 14:01:23 +01:00
#include <stdarg.h>
2020-11-10 15:56:07 +01:00
extern const uint8_t MQTT_SOCK;
2020-11-14 21:15:33 +01:00
client_t client;
mqttClient_t mqttClient;
2020-11-10 15:56:07 +01:00
2020-11-14 21:15:33 +01:00
uint8_t brokerAddress[] = { 172, 16, 2, 16 };
uint16_t brokerPort = 1883;
2020-11-10 15:56:07 +01:00
2020-11-14 21:15:33 +01:00
const static char WatchdogTopic[] = "IoT/Watchdog";
const static char StartupTopic[] = "IoT/MBGW3/Startup";
const static char StatusTopic[] = "IoT/MBGW3/Status";
2020-11-10 15:56:07 +01:00
2020-11-16 17:33:17 +01:00
static uint32_t watchdogCounter = 0;
void watchdogHandler(void *handle) {
if (watchdogCounter > 0) {
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "Watchdog received in between");
2020-11-16 17:33:17 +01:00
watchdogCounter = 0;
} else {
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_RED, true, "No watchdog received in between, booting the system ...");
2020-11-16 17:33:17 +01:00
// boot the system
}
}
2020-11-14 21:15:33 +01:00
// typedef void (*callback_t)(char*, uint8_t*, uint16_t);
static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) {
if (0 == strcmp(topic, WatchdogTopic)) {
2020-11-16 17:33:17 +01:00
watchdogCounter++;
} else {
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqcb: %s : %.*s", topic, payloadLength, payload);
2020-11-10 15:56:07 +01:00
}
}
2020-11-14 21:15:33 +01:00
static void mqttStatusPublisher(void *handle) {
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqsp: publishing status");
2020-11-14 21:15:33 +01:00
t_mbusCommStats *mbusCommStats = mbusCommGetStats();
char buf[64];
2020-11-15 01:48:05 +01:00
snprintf(buf, 128, "{\"uptime\":\"%ld\", \"tasks\":\"%d\", \"requests\":\"%d\", \"errors\":\"%d\"}",
2020-11-14 21:32:21 +01:00
HAL_GetTick()/1000, schTaskCnt(), mbusCommStats->requestCnt, mbusCommStats->errorCnt);
2020-11-14 21:15:33 +01:00
bool res = publish(&mqttClient, StatusTopic, (const char*)buf, strlen(buf), false);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res);
2020-11-10 15:56:07 +01:00
}
2020-11-14 21:15:33 +01:00
void mqttCommHandler(void *handle) {
static uint8_t state = 0;
static uint8_t message[] = "MeterbusGateway3Variant starting";
if (isNetworkAvailable()) {
switch (state) {
case 0:
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, initializing mqtt client");
2020-11-14 21:15:33 +01:00
client.sockNum = MQTT_SOCK;
mqttClientInit(&mqttClient, &client, mqttCallback);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch: mqtt client initialized");
2020-11-14 21:15:33 +01:00
state = 1;
break;
case 1:
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, connecting to broker ");
2020-11-14 21:15:33 +01:00
bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, mqttConnect returns %d", res);
2020-11-14 21:15:33 +01:00
if (res) {
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, ok, connected");
2020-11-14 21:15:33 +01:00
state = 2;
} else {
state = 255;
}
break;
case 2:
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, publish start-up");
2020-11-14 21:15:33 +01:00
res = publish(&mqttClient, StartupTopic, (const char*)message, strlen(message), false);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res);
2020-11-14 21:15:33 +01:00
schAdd(mqttStatusPublisher, NULL, 0, 60000);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, status publisher scheduled");
2020-11-14 21:15:33 +01:00
state = 3;
break;
case 3:
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, subscribe watchdog");
2020-11-14 21:15:33 +01:00
res = subscribe(&mqttClient, WatchdogTopic, MQTTQOS0);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, subscribe returned %d", res);
2020-11-16 17:35:44 +01:00
schAdd(watchdogHandler, NULL, 60000, 60000);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, watchdogHandler scheduled");
2020-11-14 21:15:33 +01:00
state = 4;
break;
case 4:
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqch, now entering the loop");
2020-11-14 21:15:33 +01:00
state = 5;
break;
case 5:
if (! mqttLoop(&mqttClient)) {
state = 0;
}
break;
case 255:
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_RED, true, "mqch, error state, will stop here");
2020-11-14 21:15:33 +01:00
schDel(mqttCommHandler, NULL);
2020-11-16 17:33:17 +01:00
schDel(watchdogHandler, NULL);
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_RED, true, "mqch, trying again in one minute");
2020-11-14 21:15:33 +01:00
schAdd(mqttCommHandler, NULL, 60000, 100);
break;
}
2020-11-10 15:56:07 +01:00
}
2020-11-10 16:06:23 +01:00
}
2020-11-14 21:15:33 +01:00
void mqttCommInit() {
schAdd(mqttCommHandler, NULL, 0, 100);
2020-11-14 21:39:40 +01:00
}
void mqttPublish(char *topic, char *message) {
2020-11-18 14:01:23 +01:00
bool res = false;
if (isNetworkAvailable()) {
res = publish(&mqttClient, topic, message, strlen(message), false);
}
2020-11-14 21:39:40 +01:00
if (res) {
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_GREEN, false, "mqp: %s -> %s successfully published", message, topic);
2020-11-14 21:39:40 +01:00
} else {
2020-11-17 12:01:15 +01:00
coloredMsg(LOG_RED, true, "mqp: %s -> %s failed to publish", message, topic);
2020-11-14 21:39:40 +01:00
}
2020-11-18 14:01:23 +01:00
}
void mqttPublishf(char *topic, char *messageFormat, ...) {
va_list vl;
va_start(vl, messageFormat);
char buf[2048];
int res = vsnprintf(buf, sizeof(buf), messageFormat, vl);
va_end(vl);
if (res < sizeof(buf)) {
mqttPublish(topic, buf);
} else {
coloredMsg(LOG_RED, true, "mqc mpf buffer overflow, truncated message not published");
}
}