#include #include #include #include #include #include #include #include #include #include #include #include #include extern const uint8_t MQTT_SOCK; client_t client; mqttClient_t mqttClient; uint8_t brokerAddress[] = { 172, 16, 2, 16 }; uint16_t brokerPort = 1883; const static char WatchdogTopic[] = "IoT/Watchdog"; const static char StartupTopic[] = "IoT/MBGW3/Startup"; const static char StatusTopic[] = "IoT/MBGW3/Status"; // typedef void (*callback_t)(char*, uint8_t*, uint16_t); static void mqttCallback(char *topic, uint8_t *payload, uint16_t payloadLength) { logMsg("mqcb: %s : %.*s", topic, payloadLength, payload); if (0 == strcmp(topic, WatchdogTopic)) { coloredMsg(LOG_GREEN, "mqcb: watchdog message received"); } } static void mqttStatusPublisher(void *handle) { logMsg("mqsp: publishing status"); t_mbusCommStats *mbusCommStats = mbusCommGetStats(); char buf[64]; snprintf(buf, 64, "{\"uptime\":\"%ld\", \"tasks\":\"%d\", \"requests\":\"%d\", \"errors\":\"%d\"}", HAL_GetTick()/1000, schTaskCnt(), mbusCommStats->requestCnt, mbusCommStats->errorCnt); bool res = publish(&mqttClient, StatusTopic, (const char*)buf, strlen(buf), false); coloredMsg(LOG_YELLOW, "mqch, publish returned %d", res); } void mqttCommHandler(void *handle) { static uint8_t state = 0; static uint8_t message[] = "MeterbusGateway3Variant starting"; if (isNetworkAvailable()) { switch (state) { case 0: coloredMsg(LOG_YELLOW, "mqch, initializing mqtt client"); client.sockNum = MQTT_SOCK; mqttClientInit(&mqttClient, &client, mqttCallback); coloredMsg(LOG_YELLOW, "mqch: mqtt client initialized"); state = 1; break; case 1: coloredMsg(LOG_YELLOW, "mqch, connecting to broker "); bool res = mqttConnect(&mqttClient, brokerAddress, 1883, "mbv3gw-client", NULL, NULL, NULL, 0, false, NULL, false); coloredMsg(LOG_YELLOW, "mqch, mqttConnect returns %d", res); if (res) { coloredMsg(LOG_YELLOW, "mqch, ok, connected"); state = 2; } else { state = 255; } break; case 2: coloredMsg(LOG_YELLOW, "mqch, publish start-up"); res = publish(&mqttClient, StartupTopic, (const char*)message, strlen(message), false); coloredMsg(LOG_YELLOW, "mqch, publish returned %d", res); schAdd(mqttStatusPublisher, NULL, 0, 60000); coloredMsg(LOG_YELLOW, "mqch, status publisher scheduled"); state = 3; break; case 3: coloredMsg(LOG_YELLOW, "mqch, subscribe watchdog"); res = subscribe(&mqttClient, WatchdogTopic, MQTTQOS0); coloredMsg(LOG_YELLOW, "mqch, subscribe returned %d", res); state = 4; break; case 4: coloredMsg(LOG_YELLOW, "mqch, now entering the loop"); state = 5; break; case 5: // coloredMsg(LOG_YELLOW, "mqch, looping"); if (! mqttLoop(&mqttClient)) { state = 0; } break; case 255: coloredMsg(LOG_YELLOW, "mqch, error state, will stop here"); schDel(mqttCommHandler, NULL); coloredMsg(LOG_YELLOW, "mqch, trying again in one minute"); schAdd(mqttCommHandler, NULL, 60000, 100); break; } } else { coloredMsg(LOG_YELLOW, "mqch, network not yet ready"); } } void mqttCommInit() { schAdd(mqttCommHandler, NULL, 0, 100); } void mqttPublish(char *topic, char *message) { bool res = publish(&mqttClient, topic, message, strlen(message), false); if (res) { coloredMsg(LOG_YELLOW, "mqp: %s -> %s successfully published", message, topic); } else { coloredMsg(LOG_RED, "mqp: %s -> %s failed to publish", message, topic); } }