#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern const uint8_t MQTT_SOCK; static t_configBlock *config; client_t client; mqttClient_t mqttClient; uint8_t brokerAddress[4]; uint16_t brokerPort = 1883; static uint32_t watchdogCounter = 0; void watchdogHandler(void *handle) { if (watchdogCounter > 0) { coloredMsg(LOG_GREEN, false, "Watchdog received in between"); watchdogCounter = 0; } else { coloredMsg(LOG_RED, true, "No watchdog received in between, booting the system ..."); // boot the system HAL_NVIC_SystemReset(); } } // typedef void (*callback_t)(char*, uint8_t*, uint16_t); static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) { if (0 == strcmp(topic, config->watchdogTopic)) { HAL_IWDG_Refresh(&hiwdg); watchdogCounter++; } else { coloredMsg(LOG_GREEN, false, "mqcb: %s : %.*s", topic, payloadLength, payload); } } static void mqttStatusPublisher(void *handle) { coloredMsg(LOG_GREEN, false, "mqsp: publishing status"); t_mbusCommStats *mbusCommStats = mbusCommGetStats(); t_deviceStats* globalDeviceStats = getGlobalDeviceStats(); char buf[256]; uint32_t uptime = HAL_GetTick() / 1000; snprintf(buf, sizeof(buf), "{\"uptime\":\"%ld\", \"runningHours\":\"%ld\", \"powerCycles\":\"%ld\", \"tasks\":\"%d\", \"requests\":\"%ld\", \"errors\":\"%ld\", \"octets\":\"%ld\", \"overrun\":\"%ld\", \"framing\":\"%ld\", \"parity\":\"%ld\", \"noise\":\"%ld\"}", uptime, globalDeviceStats->totalRunningHours, globalDeviceStats->totalPowercycles, schTaskCnt(), mbusCommStats->mbusRequestCnt, mbusCommStats->mbusErrorCnt, mbusCommStats->uartOctetCnt, mbusCommStats->uartOverrunCnt, mbusCommStats->uartFramingErrCnt, mbusCommStats->uartParityErrCnt, mbusCommStats->uartNoiseErrCnt); bool res = publish(&mqttClient, config->statusTopic, (const uint8_t*)buf, strlen(buf), false); coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res); oledSetActiveScreen(OLED_SCREEN1); oledClearActiveScreen(); uint8_t *addr = wizGetIPAddress(); oledPrintf(OLED_SCREEN1, "Addr:%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]); oledPrintf(OLED_SCREEN1, "Network available:%d", isNetworkAvailable()); oledPrintf(OLED_SCREEN1, "Uptime:%ld", uptime); oledPrintf(OLED_SCREEN1, "PowerCycl:%d", globalDeviceStats->totalPowercycles); oledPrintf(OLED_SCREEN1, "Req:%ld", mbusCommStats->mbusRequestCnt); oledPrintf(OLED_SCREEN1, "Err:%ld", mbusCommStats->mbusErrorCnt); oledPrintf(OLED_SCREEN1, "Uart:%d", mbusCommStats->uartOverrunCnt + mbusCommStats->uartFramingErrCnt + mbusCommStats->uartParityErrCnt + mbusCommStats->uartNoiseErrCnt); } void mqttCommHandler(void *handle) { static uint8_t state = 0; static uint8_t message[] = "MeterbusGateway3Variant starting"; if (isNetworkAvailable()) { switch (state) { case 0: coloredMsg(LOG_GREEN, false, "mqch, initializing mqtt client"); client.sockNum = MQTT_SOCK; mqttClientInit(&mqttClient, &client, mqttCallback); coloredMsg(LOG_GREEN, false, "mqch: mqtt client initialized"); state = 1; break; case 1: coloredMsg(LOG_GREEN, false, "mqch, resolving broker name"); if (! wizDnsQuery(config->brokerName, brokerAddress)) { coloredMsg(LOG_GREEN, false, "mqch, query for broker address failed, going to error state"); state = 255; } coloredMsg(LOG_GREEN, false, "mqch, connecting to broker "); bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false); coloredMsg(LOG_GREEN, false, "mqch, mqttConnect returns %d", res); if (res) { coloredMsg(LOG_GREEN, false, "mqch, ok, connected"); state = 2; } else { state = 255; } break; case 2: coloredMsg(LOG_GREEN, false, "mqch, publish start-up"); res = publish(&mqttClient, config->startupTopic, (const uint8_t*)message, strlen((char*)message), false); coloredMsg(LOG_GREEN, false, "mqch, publish returned %d", res); schAdd(mqttStatusPublisher, NULL, 0, 60000); coloredMsg(LOG_GREEN, false, "mqch, status publisher scheduled"); state = 3; break; case 3: coloredMsg(LOG_GREEN, false, "mqch, subscribe watchdog"); res = subscribe(&mqttClient, config->watchdogTopic, MQTTQOS0); coloredMsg(LOG_GREEN, false, "mqch, subscribe returned %d", res); schAdd(watchdogHandler, NULL, 15000, 15000); coloredMsg(LOG_GREEN, false, "mqch, watchdogHandler scheduled"); state = 4; break; case 4: coloredMsg(LOG_GREEN, false, "mqch, now entering the loop"); state = 5; break; case 5: if (! mqttLoop(&mqttClient)) { state = 0; } break; case 255: coloredMsg(LOG_RED, true, "mqch, error state, will stop here"); schDel(mqttCommHandler, NULL); schDel(watchdogHandler, NULL); coloredMsg(LOG_RED, true, "mqch, trying again in one minute"); schAdd(mqttCommHandler, NULL, 60000, 100); break; } } } void mqttCommInit() { config = getConfig(); schAdd(mqttCommHandler, NULL, 0, 100); } void mqttPublish(const char *topic, char *message) { bool res = false; if (isNetworkAvailable()) { res = publish(&mqttClient, topic, (const uint8_t*)message, strlen(message), false); } if (res) { coloredMsg(LOG_GREEN, false, "mqp: %s -> %s successfully published", message, topic); } else { coloredMsg(LOG_RED, true, "mqp: %s -> %s failed to publish", message, topic); } } void mqttPublishf(const char *topic, char *messageFormat, ...) { va_list vl; va_start(vl, messageFormat); char buf[2048]; int res = vsnprintf(buf, sizeof(buf), messageFormat, vl); va_end(vl); if (res < sizeof(buf)) { mqttPublish(topic, buf); } else { coloredMsg(LOG_RED, true, "mqc mpf buffer overflow, truncated message not published"); } }