161 lines
5.1 KiB
C
161 lines
5.1 KiB
C
#include <mqttComm.h>
|
|
#include <logger.h>
|
|
#include <main.h>
|
|
|
|
#include <pubsubc.h>
|
|
#include <platformAdaption.h>
|
|
#include <PontCoopScheduler.h>
|
|
#include <wizHelper.h>
|
|
#include <mbusComm.h>
|
|
|
|
#include <stdint.h>
|
|
#include <stdlib.h>
|
|
#include <stdbool.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <stdarg.h>
|
|
|
|
extern const uint8_t MQTT_SOCK;
|
|
|
|
client_t client;
|
|
mqttClient_t mqttClient;
|
|
|
|
uint8_t brokerAddress[] = { 172, 16, 2, 16 };
|
|
uint16_t brokerPort = 1883;
|
|
|
|
const static char WatchdogTopic[] = "IoT/Watchdog";
|
|
const static char StartupTopic[] = "IoT/MBGW3/Startup";
|
|
const static char StatusTopic[] = "IoT/MBGW3/Status";
|
|
|
|
|
|
static uint32_t watchdogCounter = 0;
|
|
|
|
void watchdogHandler(void *handle) {
|
|
if (watchdogCounter > 0) {
|
|
coloredMsg(LOG_GREEN, false, "Watchdog received in between");
|
|
watchdogCounter = 0;
|
|
} else {
|
|
coloredMsg(LOG_RED, true, "No watchdog received in between, booting the system ...");
|
|
// boot the system
|
|
}
|
|
}
|
|
|
|
// typedef void (*callback_t)(char*, uint8_t*, uint16_t);
|
|
static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) {
|
|
if (0 == strcmp(topic, WatchdogTopic)) {
|
|
watchdogCounter++;
|
|
} else {
|
|
coloredMsg(LOG_GREEN, false, "mqcb: %s : %.*s", topic, payloadLength, payload);
|
|
}
|
|
}
|
|
|
|
static void mqttStatusPublisher(void *handle) {
|
|
coloredMsg(LOG_GREEN, false, "mqsp: publishing status");
|
|
t_mbusCommStats *mbusCommStats = mbusCommGetStats();
|
|
char buf[64];
|
|
snprintf(buf, 128, "{\"uptime\":\"%ld\", \"tasks\":\"%d\", \"requests\":\"%ld\", \"errors\":\"%ld\"}",
|
|
HAL_GetTick()/1000, schTaskCnt(), mbusCommStats->requestCnt, mbusCommStats->errorCnt);
|
|
bool res = publish(&mqttClient, StatusTopic, (const uint8_t*)buf, strlen(buf), false);
|
|
coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res);
|
|
|
|
}
|
|
|
|
void mqttCommHandler(void *handle) {
|
|
static uint8_t state = 0;
|
|
static uint8_t message[] = "MeterbusGateway3Variant starting";
|
|
|
|
if (isNetworkAvailable()) {
|
|
switch (state) {
|
|
case 0:
|
|
coloredMsg(LOG_GREEN, false, "mqch, initializing mqtt client");
|
|
client.sockNum = MQTT_SOCK;
|
|
mqttClientInit(&mqttClient, &client, mqttCallback);
|
|
coloredMsg(LOG_GREEN, false, "mqch: mqtt client initialized");
|
|
|
|
state = 1;
|
|
break;
|
|
|
|
case 1:
|
|
coloredMsg(LOG_GREEN, false, "mqch, connecting to broker ");
|
|
bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false);
|
|
coloredMsg(LOG_GREEN, false, "mqch, mqttConnect returns %d", res);
|
|
|
|
if (res) {
|
|
coloredMsg(LOG_GREEN, false, "mqch, ok, connected");
|
|
state = 2;
|
|
} else {
|
|
state = 255;
|
|
}
|
|
break;
|
|
|
|
case 2:
|
|
coloredMsg(LOG_GREEN, false, "mqch, publish start-up");
|
|
res = publish(&mqttClient, StartupTopic, (const uint8_t*)message, strlen((char*)message), false);
|
|
coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res);
|
|
schAdd(mqttStatusPublisher, NULL, 0, 60000);
|
|
coloredMsg(LOG_GREEN, false, "mqch, status publisher scheduled");
|
|
state = 3;
|
|
break;
|
|
|
|
case 3:
|
|
coloredMsg(LOG_GREEN, false, "mqch, subscribe watchdog");
|
|
res = subscribe(&mqttClient, WatchdogTopic, MQTTQOS0);
|
|
coloredMsg(LOG_GREEN, false, "mqch, subscribe returned %d", res);
|
|
schAdd(watchdogHandler, NULL, 60000, 60000);
|
|
coloredMsg(LOG_GREEN, false, "mqch, watchdogHandler scheduled");
|
|
state = 4;
|
|
break;
|
|
|
|
case 4:
|
|
coloredMsg(LOG_GREEN, false, "mqch, now entering the loop");
|
|
state = 5;
|
|
break;
|
|
|
|
case 5:
|
|
if (! mqttLoop(&mqttClient)) {
|
|
state = 0;
|
|
}
|
|
break;
|
|
|
|
case 255:
|
|
coloredMsg(LOG_RED, true, "mqch, error state, will stop here");
|
|
schDel(mqttCommHandler, NULL);
|
|
schDel(watchdogHandler, NULL);
|
|
coloredMsg(LOG_RED, true, "mqch, trying again in one minute");
|
|
schAdd(mqttCommHandler, NULL, 60000, 100);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void mqttCommInit() {
|
|
schAdd(mqttCommHandler, NULL, 0, 100);
|
|
}
|
|
|
|
void mqttPublish(const char *topic, char *message) {
|
|
bool res = false;
|
|
if (isNetworkAvailable()) {
|
|
res = publish(&mqttClient, topic, (const uint8_t*)message, strlen(message), false);
|
|
}
|
|
if (res) {
|
|
coloredMsg(LOG_GREEN, false, "mqp: %s -> %s successfully published", message, topic);
|
|
} else {
|
|
coloredMsg(LOG_RED, true, "mqp: %s -> %s failed to publish", message, topic);
|
|
}
|
|
}
|
|
|
|
void mqttPublishf(const char *topic, char *messageFormat, ...) {
|
|
va_list vl;
|
|
va_start(vl, messageFormat);
|
|
char buf[2048];
|
|
int res = vsnprintf(buf, sizeof(buf), messageFormat, vl);
|
|
va_end(vl);
|
|
if (res < sizeof(buf)) {
|
|
mqttPublish(topic, buf);
|
|
} else {
|
|
coloredMsg(LOG_RED, true, "mqc mpf buffer overflow, truncated message not published");
|
|
}
|
|
}
|
|
|
|
|