#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern const uint8_t MQTT_SOCK; client_t client; mqttClient_t mqttClient; uint8_t brokerAddress[] = { 172, 16, 2, 16 }; uint16_t brokerPort = 1883; const static char WatchdogTopic[] = "IoT/Watchdog"; const static char StartupTopic[] = "IoT/MBGW3/Startup"; const static char StatusTopic[] = "IoT/MBGW3/Status"; static uint32_t watchdogCounter = 0; void watchdogHandler(void *handle) { if (watchdogCounter > 0) { coloredMsg(LOG_GREEN, false, "Watchdog received in between"); watchdogCounter = 0; } else { coloredMsg(LOG_RED, true, "No watchdog received in between, booting the system ..."); // boot the system } } // typedef void (*callback_t)(char*, uint8_t*, uint16_t); static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) { if (0 == strcmp(topic, WatchdogTopic)) { watchdogCounter++; } else { coloredMsg(LOG_GREEN, false, "mqcb: %s : %.*s", topic, payloadLength, payload); } } static void mqttStatusPublisher(void *handle) { coloredMsg(LOG_GREEN, false, "mqsp: publishing status"); t_mbusCommStats *mbusCommStats = mbusCommGetStats(); char buf[64]; uint32_t uptime = HAL_GetTick() / 1000; snprintf(buf, 128, "{\"uptime\":\"%ld\", \"tasks\":\"%d\", \"requests\":\"%ld\", \"errors\":\"%ld\"}", uptime, schTaskCnt(), mbusCommStats->requestCnt, mbusCommStats->errorCnt); bool res = publish(&mqttClient, StatusTopic, (const uint8_t*)buf, strlen(buf), false); coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res); oledClear(); oledSetActiveScreen(OLED_SCREEN1); uint8_t *addr = wizGetIPAddress(); oledPrintf(OLED_SCREEN1, "Addr:%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]); oledPrintf(OLED_SCREEN1, "Network available:%d", isNetworkAvailable()); oledPrintf(OLED_SCREEN1, "Uptime:%ld", uptime); oledPrintf(OLED_SCREEN1, "Req:%ld", mbusCommStats->requestCnt); oledPrintf(OLED_SCREEN1, "Err:%ld", mbusCommStats->errorCnt); } void mqttCommHandler(void *handle) { static uint8_t state = 0; static uint8_t message[] = "MeterbusGateway3Variant starting"; if (isNetworkAvailable()) { switch (state) { case 0: coloredMsg(LOG_GREEN, false, "mqch, initializing mqtt client"); client.sockNum = MQTT_SOCK; mqttClientInit(&mqttClient, &client, mqttCallback); coloredMsg(LOG_GREEN, false, "mqch: mqtt client initialized"); state = 1; break; case 1: coloredMsg(LOG_GREEN, false, "mqch, connecting to broker "); bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false); coloredMsg(LOG_GREEN, false, "mqch, mqttConnect returns %d", res); if (res) { coloredMsg(LOG_GREEN, false, "mqch, ok, connected"); state = 2; } else { state = 255; } break; case 2: coloredMsg(LOG_GREEN, false, "mqch, publish start-up"); res = publish(&mqttClient, StartupTopic, (const uint8_t*)message, strlen((char*)message), false); coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res); schAdd(mqttStatusPublisher, NULL, 0, 60000); coloredMsg(LOG_GREEN, false, "mqch, status publisher scheduled"); state = 3; break; case 3: coloredMsg(LOG_GREEN, false, "mqch, subscribe watchdog"); res = subscribe(&mqttClient, WatchdogTopic, MQTTQOS0); coloredMsg(LOG_GREEN, false, "mqch, subscribe returned %d", res); schAdd(watchdogHandler, NULL, 60000, 60000); coloredMsg(LOG_GREEN, false, "mqch, watchdogHandler scheduled"); state = 4; break; case 4: coloredMsg(LOG_GREEN, false, "mqch, now entering the loop"); state = 5; break; case 5: if (! mqttLoop(&mqttClient)) { state = 0; } break; case 255: coloredMsg(LOG_RED, true, "mqch, error state, will stop here"); schDel(mqttCommHandler, NULL); schDel(watchdogHandler, NULL); coloredMsg(LOG_RED, true, "mqch, trying again in one minute"); schAdd(mqttCommHandler, NULL, 60000, 100); break; } } } void mqttCommInit() { schAdd(mqttCommHandler, NULL, 0, 100); } void mqttPublish(const char *topic, char *message) { bool res = false; if (isNetworkAvailable()) { res = publish(&mqttClient, topic, (const uint8_t*)message, strlen(message), false); } if (res) { coloredMsg(LOG_GREEN, false, "mqp: %s -> %s successfully published", message, topic); } else { coloredMsg(LOG_RED, true, "mqp: %s -> %s failed to publish", message, topic); } } void mqttPublishf(const char *topic, char *messageFormat, ...) { va_list vl; va_start(vl, messageFormat); char buf[2048]; int res = vsnprintf(buf, sizeof(buf), messageFormat, vl); va_end(vl); if (res < sizeof(buf)) { mqttPublish(topic, buf); } else { coloredMsg(LOG_RED, true, "mqc mpf buffer overflow, truncated message not published"); } }