From 359fd03f03d428116b4330345f522dacb240cc26 Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Thu, 31 Oct 2013 21:45:30 +0000 Subject: [PATCH] Added support for QOS1 for messages received from the server. Add qos argument to subscribe. Auto ack after callback is run --- PubSubClient/PubSubClient.cpp | 32 ++++++++++++++++++++++++++++---- PubSubClient/PubSubClient.h | 2 ++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index ca682e6..53380bb 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -166,6 +166,8 @@ boolean PubSubClient::loop() { if (_client->available()) { uint8_t llen; uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; if (len > 0) { lastInActivity = t; uint8_t type = buffer[0]&0xF0; @@ -177,9 +179,16 @@ boolean PubSubClient::loop() { topic[i] = buffer[llen+3+i]; } topic[tl] = 0; - // ignore msgID - only support QoS 0 subs - uint8_t *payload = buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); + // msgId only present for QOS>0 + if (buffer[0]&MQTTQOS1) { + msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; + payload = buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + puback(msgId); + } else { + payload = buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } } } else if (type == MQTTPINGREQ) { buffer[0] = MQTTPINGRESP; @@ -293,6 +302,10 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { } boolean PubSubClient::subscribe(char* topic) { + return subscribe(topic, 0); +} + +boolean PubSubClient::subscribe(char* topic, uint8_t qos) { if (connected()) { // Leave room in the buffer for header and variable length field uint16_t length = 5; @@ -303,12 +316,23 @@ boolean PubSubClient::subscribe(char* topic) { buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); - buffer[length++] = 0; // Only do QoS 0 subs + buffer[length++] = qos; return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); } return false; } +boolean PubSubClient::puback(uint16_t msgId) { + if(connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + buffer[length++] = (msgId >> 8); + buffer[length++] = (msgId & 0xFF); + return write(MQTTPUBACK,buffer,length-5); + } + return false; +} + boolean PubSubClient::unsubscribe(char* topic) { if (connected()) { uint16_t length = 5; diff --git a/PubSubClient/PubSubClient.h b/PubSubClient/PubSubClient.h index 3032446..f87d5c1 100755 --- a/PubSubClient/PubSubClient.h +++ b/PubSubClient/PubSubClient.h @@ -67,7 +67,9 @@ public: boolean publish(char *, uint8_t *, unsigned int, boolean); boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean); boolean subscribe(char *); + boolean subscribe(char *, uint8_t qos); boolean unsubscribe(char *); + boolean puback(uint16_t msgId); boolean loop(); boolean connected(); };