From 359fd03f03d428116b4330345f522dacb240cc26 Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Thu, 31 Oct 2013 21:45:30 +0000 Subject: [PATCH 1/7] Added support for QOS1 for messages received from the server. Add qos argument to subscribe. Auto ack after callback is run --- PubSubClient/PubSubClient.cpp | 32 ++++++++++++++++++++++++++++---- PubSubClient/PubSubClient.h | 2 ++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index ca682e6..53380bb 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -166,6 +166,8 @@ boolean PubSubClient::loop() { if (_client->available()) { uint8_t llen; uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; if (len > 0) { lastInActivity = t; uint8_t type = buffer[0]&0xF0; @@ -177,9 +179,16 @@ boolean PubSubClient::loop() { topic[i] = buffer[llen+3+i]; } topic[tl] = 0; - // ignore msgID - only support QoS 0 subs - uint8_t *payload = buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); + // msgId only present for QOS>0 + if (buffer[0]&MQTTQOS1) { + msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; + payload = buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + puback(msgId); + } else { + payload = buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } } } else if (type == MQTTPINGREQ) { buffer[0] = MQTTPINGRESP; @@ -293,6 +302,10 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { } boolean PubSubClient::subscribe(char* topic) { + return subscribe(topic, 0); +} + +boolean PubSubClient::subscribe(char* topic, uint8_t qos) { if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; @@ -303,12 +316,23 @@ boolean PubSubClient::subscribe(char* topic) { buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); - buffer[length++] = 0; // Only do QoS 0 subs + buffer[length++] = qos; return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); } return false; } +boolean PubSubClient::puback(uint16_t msgId) { + if(connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + buffer[length++] = (msgId >> 8); + buffer[length++] = (msgId & 0xFF); + return write(MQTTPUBACK,buffer,length-5); + } + return false; +} + boolean PubSubClient::unsubscribe(char* topic) { if (connected()) { uint16_t length = 5; diff --git a/PubSubClient/PubSubClient.h b/PubSubClient/PubSubClient.h index 3032446..f87d5c1 100755 --- a/PubSubClient/PubSubClient.h +++ b/PubSubClient/PubSubClient.h @@ -67,7 +67,9 @@ public: boolean publish(char *, uint8_t *, unsigned int, boolean); boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean); boolean subscribe(char *); + boolean subscribe(char *, uint8_t qos); boolean unsubscribe(char *); + boolean puback(uint16_t msgId); boolean loop(); boolean connected(); }; From 94df17720e0db3f6e1b35421ca599b1c2d7b2704 Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Fri, 29 Nov 2013 00:18:55 +0000 Subject: [PATCH 2/7] Added check for valid QOS (0 or 1) --- PubSubClient/PubSubClient.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index 53380bb..27f37a6 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -306,6 +306,9 @@ boolean PubSubClient::subscribe(char* topic) { } boolean PubSubClient::subscribe(char* topic, uint8_t qos) { + if (qos < 0 || qos > 1) + return false; + if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; From 8a2908148676cae555efc59a7052577c3dbb8435 Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Mon, 2 Dec 2013 12:43:34 +0000 Subject: [PATCH 3/7] Added support for stream the MQTT message to storage via a Stream object --- PubSubClient/PubSubClient.cpp | 30 +++++++++++++++++++++++++++--- PubSubClient/PubSubClient.h | 3 +++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index ca682e6..5222f2e 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -9,6 +9,7 @@ PubSubClient::PubSubClient() { this->_client = NULL; + this->stream = NULL; } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { @@ -17,6 +18,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,ui this->ip = ip; this->port = port; this->domain = NULL; + this->stream = NULL; } PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { @@ -24,6 +26,24 @@ PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,u this->callback = callback; this->domain = domain; this->port = port; + this->stream = NULL; +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) { + this->_client = &client; + this->callback = callback; + this->ip = ip; + this->port = port; + this->domain = NULL; + this->stream = stream; +} + +PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) { + this->_client = &client; + this->callback = callback; + this->domain = domain; + this->port = port; + this->stream = stream; } boolean PubSubClient::connect(char *id) { @@ -136,15 +156,19 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { *lengthLength = len-1; for (uint16_t i = 0;istream && buffer[0]&MQTTPUBLISH) + this->stream->write(digit); if (len < MQTT_MAX_PACKET_SIZE) { - buffer[len++] = readByte(); + buffer[len++] = digit; } else { - readByte(); len = 0; // This will cause the packet to be ignored. } } - return len; + // If a stream has been provided, indicate that we wrote the whole length, + // else return 0 if the length exceed the max packet size + return this->stream ? length : len; } boolean PubSubClient::loop() { diff --git a/PubSubClient/PubSubClient.h b/PubSubClient/PubSubClient.h index 3032446..4e05607 100755 --- a/PubSubClient/PubSubClient.h +++ b/PubSubClient/PubSubClient.h @@ -53,10 +53,13 @@ private: uint8_t *ip; char* domain; uint16_t port; + Stream* stream; public: PubSubClient(); PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*); PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*); boolean connect(char *); boolean connect(char *, char *, char *); boolean connect(char *, char *, uint8_t, uint8_t, char *); From 8b458cdad9cd9e8f8e4005794e92c8b4900fca28 Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Wed, 4 Dec 2013 18:00:17 +0000 Subject: [PATCH 4/7] Fixed precedence bug --- PubSubClient/PubSubClient.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index 5222f2e..64497b2 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -157,18 +157,18 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { for (uint16_t i = 0;istream && buffer[0]&MQTTPUBLISH) + if(this->stream && ((buffer[0]&0xF0) == MQTTPUBLISH)) this->stream->write(digit); if (len < MQTT_MAX_PACKET_SIZE) { buffer[len++] = digit; } else { - len = 0; // This will cause the packet to be ignored. + if(!this->stream) len = 0; // This will cause the packet to be ignored. } } // If a stream has been provided, indicate that we wrote the whole length, // else return 0 if the length exceed the max packet size - return this->stream ? length : len; + return len; } boolean PubSubClient::loop() { From edd167b81b60c14ea4d1486e8672d21b73737550 Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Mon, 20 Jan 2014 22:06:22 +0000 Subject: [PATCH 5/7] Payload for Stream now doesnt include the topic headers and msgid --- PubSubClient/PubSubClient.cpp | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index abe9b8b..fcfd86d 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -154,11 +154,23 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { multiplier *= 128; } while ((digit & 128) != 0); *lengthLength = len-1; - for (uint16_t i = 0;istream && ((buffer[0]&0xF0) == MQTTPUBLISH)) + if(this->stream && ((buffer[0]&0xF0) == MQTTPUBLISH) && len-*lengthLength-2>skip) { this->stream->write(digit); + } if (len < MQTT_MAX_PACKET_SIZE) { buffer[len++] = digit; } else { From 1113695f4a2fc263c55462a5e70400dceaa3f90c Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Mon, 20 Jan 2014 22:16:15 +0000 Subject: [PATCH 6/7] Example of using Stream storage for messages --- .../examples/mqtt_stream/mqtt_stream.ino | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 PubSubClient/examples/mqtt_stream/mqtt_stream.ino diff --git a/PubSubClient/examples/mqtt_stream/mqtt_stream.ino b/PubSubClient/examples/mqtt_stream/mqtt_stream.ino new file mode 100644 index 0000000..bf6df7c --- /dev/null +++ b/PubSubClient/examples/mqtt_stream/mqtt_stream.ino @@ -0,0 +1,53 @@ +/* + Example of using a Stream object to store the message payload + + Uses SRAM library: https://github.com/ennui2342/arduino-sram + but could use any Stream based class such as SD + + - connects to an MQTT server + - publishes "hello world" to the topic "outTopic" + - subscribes to the topic "inTopic" +*/ + +#include +#include +#include +#include + +// Update these with values suitable for your network. +byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; +byte server[] = { 172, 16, 0, 2 }; +byte ip[] = { 172, 16, 0, 100 }; + +SRAM sram(SRAM_CS_PIN, SRAM_1024); + +void callback(char* topic, byte* payload, unsigned int length) { + sram.seek(1); + + // do something with the message + myfunctionthattakesastreamargument(&sram); + + // Reset position for the next message to be stored + sram.seek(1); +} + +EthernetClient ethClient; +PubSubClient client(server, 1883, callback, ethClient); + +void setup() +{ + Ethernet.begin(mac, ip); + if (client.connect("arduinoClient")) { + client.publish("outTopic","hello world"); + client.subscribe("inTopic"); + } + + sram.begin(); + sram.seek(1); +} + +void loop() +{ + client.loop(); +} + From 93b9a4e2d1d05b69875b3273020b56da329135de Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Mon, 20 Jan 2014 22:50:10 +0000 Subject: [PATCH 7/7] A *working* example this time! --- PubSubClient/examples/mqtt_stream/mqtt_stream.ino | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/PubSubClient/examples/mqtt_stream/mqtt_stream.ino b/PubSubClient/examples/mqtt_stream/mqtt_stream.ino index bf6df7c..0bc4279 100644 --- a/PubSubClient/examples/mqtt_stream/mqtt_stream.ino +++ b/PubSubClient/examples/mqtt_stream/mqtt_stream.ino @@ -19,20 +19,23 @@ byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; byte server[] = { 172, 16, 0, 2 }; byte ip[] = { 172, 16, 0, 100 }; -SRAM sram(SRAM_CS_PIN, SRAM_1024); +SRAM sram(4, SRAM_1024); void callback(char* topic, byte* payload, unsigned int length) { sram.seek(1); // do something with the message - myfunctionthattakesastreamargument(&sram); - + for(uint8_t i=0; i