#include #include #include #include #include #include #include #include #include #include #include #include #include extern const uint8_t MQTT_SOCK; client_t client; mqttClient_t mqttClient; uint8_t brokerAddress[] = { 172, 16, 2, 16 }; uint16_t brokerPort = 1883; const static char WatchdogTopic[] = "IoT/Watchdog"; const static char StartupTopic[] = "IoT/MBGW3/Startup"; const static char StatusTopic[] = "IoT/MBGW3/Status"; static uint32_t watchdogCounter = 0; void watchdogHandler(void *handle) { if (watchdogCounter > 0) { coloredMsg(LOG_GREEN, false, "Watchdog received in between"); watchdogCounter = 0; } else { coloredMsg(LOG_RED, true, "No watchdog received in between, booting the system ..."); // boot the system } } // typedef void (*callback_t)(char*, uint8_t*, uint16_t); static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) { if (0 == strcmp(topic, WatchdogTopic)) { watchdogCounter++; } else { coloredMsg(LOG_GREEN, false, "mqcb: %s : %.*s", topic, payloadLength, payload); } } static void mqttStatusPublisher(void *handle) { coloredMsg(LOG_GREEN, false, "mqsp: publishing status"); t_mbusCommStats *mbusCommStats = mbusCommGetStats(); char buf[64]; snprintf(buf, 128, "{\"uptime\":\"%ld\", \"tasks\":\"%d\", \"requests\":\"%d\", \"errors\":\"%d\"}", HAL_GetTick()/1000, schTaskCnt(), mbusCommStats->requestCnt, mbusCommStats->errorCnt); bool res = publish(&mqttClient, StatusTopic, (const char*)buf, strlen(buf), false); coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res); } void mqttCommHandler(void *handle) { static uint8_t state = 0; static uint8_t message[] = "MeterbusGateway3Variant starting"; if (isNetworkAvailable()) { switch (state) { case 0: coloredMsg(LOG_GREEN, false, "mqch, initializing mqtt client"); client.sockNum = MQTT_SOCK; mqttClientInit(&mqttClient, &client, mqttCallback); coloredMsg(LOG_GREEN, false, "mqch: mqtt client initialized"); state = 1; break; case 1: coloredMsg(LOG_GREEN, false, "mqch, connecting to broker "); bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false); coloredMsg(LOG_GREEN, false, "mqch, mqttConnect returns %d", res); if (res) { coloredMsg(LOG_GREEN, false, "mqch, ok, connected"); state = 2; } else { state = 255; } break; case 2: coloredMsg(LOG_GREEN, false, "mqch, publish start-up"); res = publish(&mqttClient, StartupTopic, (const char*)message, strlen(message), false); coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res); schAdd(mqttStatusPublisher, NULL, 0, 60000); coloredMsg(LOG_GREEN, false, "mqch, status publisher scheduled"); state = 3; break; case 3: coloredMsg(LOG_GREEN, false, "mqch, subscribe watchdog"); res = subscribe(&mqttClient, WatchdogTopic, MQTTQOS0); coloredMsg(LOG_GREEN, false, "mqch, subscribe returned %d", res); schAdd(watchdogHandler, NULL, 60000, 60000); coloredMsg(LOG_GREEN, false, "mqch, watchdogHandler scheduled"); state = 4; break; case 4: coloredMsg(LOG_GREEN, false, "mqch, now entering the loop"); state = 5; break; case 5: if (! mqttLoop(&mqttClient)) { state = 0; } break; case 255: coloredMsg(LOG_RED, true, "mqch, error state, will stop here"); schDel(mqttCommHandler, NULL); schDel(watchdogHandler, NULL); coloredMsg(LOG_RED, true, "mqch, trying again in one minute"); schAdd(mqttCommHandler, NULL, 60000, 100); break; } } } void mqttCommInit() { schAdd(mqttCommHandler, NULL, 0, 100); } void mqttPublish(char *topic, char *message) { bool res = publish(&mqttClient, topic, message, strlen(message), false); if (res) { coloredMsg(LOG_GREEN, false, "mqp: %s -> %s successfully published", message, topic); } else { coloredMsg(LOG_RED, true, "mqp: %s -> %s failed to publish", message, topic); } }