/* * mqttclient.cpp * * Created on: 03.03.2016 * Author: wn */ #include #include "mqttclient.h" #include #include #include #include #include #include #include "s433client.h" static const char MESSAGE_TOPIC[] = "IoT/Mqtt433Gateway/Message"; static const char WATCHDOG_TOPIC[] = "IoT/Watchdog"; void callback(char* topic, byte* payload, unsigned int length); static uint8_t MAC[] = { 0x90, 0xA2, 0xDA, 0x00, 0x51, 0x0A }; const static char BROKER[] = "mqttbroker"; EthernetClient client; PubSubClient mqttClient = PubSubClient(BROKER, 1883, callback, client); uint8_t disconnectState = 0; uint32_t disconnectTime = 0; Metro minute = Metro(60000); Metro second = Metro(1000); uint32_t uptime; void callback(char* topic, byte* payload, unsigned int length) { const uint8_t BUFSIZE = 128; if ((length + 1) >= BUFSIZE) { // 1 for terminating NUL // Serial << "Received message too long, ignore it" << endl; } else { char buffer[BUFSIZE]; memcpy(buffer, payload, length); *(buffer + length) = 0; Serial << "Received message: " << length << ", " << String(topic) << ", " << String(buffer) << endl; // 123454 24 1 // ^^^^^^ Message // ^^ Length // ^ Protocol if (!(strcmp(topic, MESSAGE_TOPIC))) { char *paramPtr = buffer; Serial << paramPtr << endl; uint32_t s433Message; uint8_t s433Length; uint8_t s433Protocol; int8_t done = 0; if ((paramPtr != 0) && (*paramPtr != 0)) { char *dataPtr = strsep(¶mPtr, " "); s433Message = strtol(dataPtr, NULL, 10); done++; } if ((paramPtr != 0) && (*paramPtr != 0)) { char *dataPtr = strsep(¶mPtr, " "); s433Length = strtol(dataPtr, NULL, 10); done++; } if ((paramPtr != 0) && (*paramPtr != 0)) { char *dataPtr = strsep(¶mPtr, " "); s433Protocol = strtol(dataPtr, NULL, 10); done++; } if (done == 3) { Serial << "complete message received" << endl; Serial << "Message: " << s433Message << endl; Serial << "Length: " << (uint16_t)s433Length << endl; Serial << "Protocol: " << (uint16_t)s433Protocol << endl;; s433ClientNS::sendMessage(s433Message, s433Length, s433Protocol); } else if (done == -1) { Serial << "error while evaluating message" << endl; } } else if (!strcmp(topic, WATCHDOG_TOPIC)) { wdt_reset(); } else { Serial << "Strange, unknown topic received" << endl; } } } void MqttClientNS::begin() { Ethernet.begin(MAC); Serial << "Got IP address: " << Ethernet.localIP() << endl; disconnectState = 3; disconnectTime = millis(); } void MqttClientNS::exec() { if ((disconnectState == 0) && (! mqttClient.loop())) { disconnectState = 1; } switch (disconnectState) { case 0: // Serial.println("discState 0"); // everything fine break; case 1: // Serial.println("discState 1"); mqttClient.disconnect(); disconnectTime = millis(); disconnectState = 2; break; case 2: // Serial.println("discState 3"); if (disconnectTime + 2000 < millis()) { disconnectState = 3; } break; case 3: // Serial.println("discState 3"); if (mqttClient.connect("Monitor")) { mqttClient.subscribe(MESSAGE_TOPIC); mqttClient.subscribe(WATCHDOG_TOPIC); disconnectTime = millis(); mqttClient.publish("IoT/Mqtt433Gateway/Started", "Mqtt433Gateway started"); disconnectState = 0; } else { disconnectState = 1; } break; default: disconnectState = 0; break; } if (second.check() == 1) { uptime++; // Serial.println("mqtt tick"); if (disconnectState == 0) { String msg = String("{ \"metadata\": { \"device\": \"Mqtt433Gateway\" }, \"data\": { \"uptime\": ") + uptime + String("}}"); mqttClient.publish("IoT/Mqtt433Gateway/Heartbeat", (char*)msg.c_str()); } } }