Merge pull request #29 from ennui2342/qos1

Added support for QOS1 for messages received from the server
This commit is contained in:
Nick O'Leary 2014-01-06 06:06:36 -08:00
commit 593c9ae13a
2 changed files with 33 additions and 4 deletions

View File

@ -166,6 +166,8 @@ boolean PubSubClient::loop() {
if (_client->available()) { if (_client->available()) {
uint8_t llen; uint8_t llen;
uint16_t len = readPacket(&llen); uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) { if (len > 0) {
lastInActivity = t; lastInActivity = t;
uint8_t type = buffer[0]&0xF0; uint8_t type = buffer[0]&0xF0;
@ -177,10 +179,17 @@ boolean PubSubClient::loop() {
topic[i] = buffer[llen+3+i]; topic[i] = buffer[llen+3+i];
} }
topic[tl] = 0; topic[tl] = 0;
// ignore msgID - only support QoS 0 subs // msgId only present for QOS>0
uint8_t *payload = buffer+llen+3+tl; if (buffer[0]&MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
puback(msgId);
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl); callback(topic,payload,len-llen-3-tl);
} }
}
} else if (type == MQTTPINGREQ) { } else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP; buffer[0] = MQTTPINGRESP;
buffer[1] = 0; buffer[1] = 0;
@ -293,6 +302,13 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
} }
boolean PubSubClient::subscribe(char* topic) { boolean PubSubClient::subscribe(char* topic) {
return subscribe(topic, 0);
}
boolean PubSubClient::subscribe(char* topic, uint8_t qos) {
if (qos < 0 || qos > 1)
return false;
if (connected()) { if (connected()) {
// Leave room in the buffer for header and variable length field // Leave room in the buffer for header and variable length field
uint16_t length = 5; uint16_t length = 5;
@ -303,12 +319,23 @@ boolean PubSubClient::subscribe(char* topic) {
buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId >> 8);
buffer[length++] = (nextMsgId & 0xFF); buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length); length = writeString(topic, buffer,length);
buffer[length++] = 0; // Only do QoS 0 subs buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
} }
return false; return false;
} }
boolean PubSubClient::puback(uint16_t msgId) {
if(connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
buffer[length++] = (msgId >> 8);
buffer[length++] = (msgId & 0xFF);
return write(MQTTPUBACK,buffer,length-5);
}
return false;
}
boolean PubSubClient::unsubscribe(char* topic) { boolean PubSubClient::unsubscribe(char* topic) {
if (connected()) { if (connected()) {
uint16_t length = 5; uint16_t length = 5;

View File

@ -67,7 +67,9 @@ public:
boolean publish(char *, uint8_t *, unsigned int, boolean); boolean publish(char *, uint8_t *, unsigned int, boolean);
boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean); boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean);
boolean subscribe(char *); boolean subscribe(char *);
boolean subscribe(char *, uint8_t qos);
boolean unsubscribe(char *); boolean unsubscribe(char *);
boolean puback(uint16_t msgId);
boolean loop(); boolean loop();
boolean connected(); boolean connected();
}; };