/* * mqttclient.cpp * * Created on: 03.03.2016 * Author: wn */ #include #include "mqttclient.h" #include #include #include #include #include #include #include "canclient.h" static const char MESSAGE_TOPIC[] = "IoT/MqttCanGateway/Message"; static const char WATCHDOG_TOPIC[] = "IoT/Watchdog"; void callback(char* topic, byte* payload, unsigned int length); static uint8_t MAC[] = { 0x90, 0xA2, 0xDA, 0x00, 0x51, 0x09 }; const static char BROKER[] = "mqttbroker"; EthernetClient client; PubSubClient mqttClient = PubSubClient(BROKER, 1883, callback, client); uint8_t disconnectState = 0; uint32_t disconnectTime = 0; Metro minute = Metro(60000); Metro second = Metro(1000); uint32_t uptime; void callback(char* topic, byte* payload, unsigned int length) { const uint8_t BUFSIZE = 128; if ((length + 1) >= BUFSIZE) { // 1 for terminating NUL // Serial << "Received message too long, ignore it" << endl; } else { char buffer[BUFSIZE]; memcpy(buffer, payload, length); *(buffer + length) = 0; Serial << "Received message: " << length << ", " << String(topic) << ", " << String(buffer) << endl; // 00000001 08 01 02 03 04 05 06 07 08 // ^^^^^^^^ Address // ^^Payload Length // ^^^^^^^^^^^^^^^^^^^^^^^ Payload if (!(strcmp(topic, MESSAGE_TOPIC))) { char *paramPtr = buffer; Serial << paramPtr << endl; uint32_t canAddress; uint8_t canLength; const uint8_t MAX_CAN_MSG_SIZE = 8; uint8_t canDataBuffer[MAX_CAN_MSG_SIZE]; uint8_t canDataBufferIdx = 0; uint8_t state = 0; int8_t done = 0; while (done == 0) { Serial << "State: " << (uint16_t) state << endl; if ((paramPtr != 0) && (*paramPtr != 0)) { char *dataPtr = strsep(¶mPtr, " "); uint32_t data = strtol(dataPtr, NULL, 16); switch (state) { case 0: Serial << "address found: " << data << endl; canAddress = data; state++; break; case 1: Serial << "length found: " << data << endl; if (data > MAX_CAN_MSG_SIZE) { Serial << "length too large" << endl; done = -1; } else { canLength = data; state = 2; } break; case 2: Serial << "payload octet found: " << data << endl; if (data > 255) { Serial << "too large for an octet" << endl; done = -1; } else { canDataBuffer[canDataBufferIdx] = data; canDataBufferIdx++; if (canDataBufferIdx == canLength) { done = 1; } } break; } } else { Serial << "Error in received message, to little data" << endl; done = -1; } } if (done == 1) { Serial << "complete message received" << endl; Serial << "Address: " << canAddress << endl; Serial << "Length: " << (uint16_t)canLength << endl; Serial << "Data: "; for (uint8_t i = 0; i < canLength; i++) { Serial << (uint16_t)canDataBuffer[i] << " "; } Serial << endl; CanClientNS::sendMessage(canAddress, canLength, canDataBuffer); } else if (done == -1) { Serial << "error while evaluating message" << endl; } } else if (!strcmp(topic, WATCHDOG_TOPIC)) { wdt_reset(); } else { // Serial << "Strange, unknown topic received" << endl; } } } void MqttClientNS::begin() { Ethernet.begin(MAC); Serial << "Got IP address: " << Ethernet.localIP() << endl; disconnectState = 3; disconnectTime = millis(); } void MqttClientNS::exec() { if ((disconnectState == 0) && (! mqttClient.loop())) { disconnectState = 1; } switch (disconnectState) { case 0: // Serial.println("discState 0"); // everything fine break; case 1: // Serial.println("discState 1"); mqttClient.disconnect(); disconnectTime = millis(); disconnectState = 2; break; case 2: // Serial.println("discState 3"); if (disconnectTime + 2000 < millis()) { disconnectState = 3; } break; case 3: // Serial.println("discState 3"); if (mqttClient.connect("Monitor")) { mqttClient.subscribe(MESSAGE_TOPIC); mqttClient.subscribe(WATCHDOG_TOPIC); disconnectTime = millis(); mqttClient.publish("IoT/MqttCanGateway/Started", "MqttCanGateway started"); disconnectState = 0; } else { disconnectState = 1; } break; default: disconnectState = 0; break; } if (second.check() == 1) { uptime++; // Serial.println("mqtt tick"); if (disconnectState == 0) { String msg = String("{ \"metadata\": { \"device\": \"MqttCanGateway\" }, \"data\": { \"uptime\": ") + uptime + String("}}"); mqttClient.publish("IoT/MqttCanGateway/Heartbeat", (char*)msg.c_str()); } } } void sendMessage(uint32_t address, uint8_t length, uint8_t *payload) { const uint8_t BUFSIZE1 = 25; char buffer1[BUFSIZE1]; char *bufPtr = buffer1; for (uint8_t i = 0; i < length; i++) { int res = sprintf(bufPtr, "%02x ", payload[i]); bufPtr += res; } int l = strlen(buffer1); buffer1[l-1] = 0; const uint8_t BUFSIZE2 = 170; char buffer2[BUFSIZE2]; int res = snprintf(buffer2, BUFSIZE2, "{ \"metadata\": { \"device\":\"MqttCanGateway\" }, \"data\": { \"uptime\": %ld, \"address\": \"%08lx\", \"length\": %d, \"payload\": \"%s\" }}", uptime, address, length, buffer1); if (res >= BUFSIZE2) { Serial << "too much payload" << endl; } else { Serial << "length: " << res << endl; Serial << "about to send " << buffer2 << endl; } }