diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index 3a12121..1f8a19c 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -51,7 +51,8 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, if (result) { nextMsgId = 1; uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; - uint8_t length = 0; + // Leave room in the buffer for header and variable length field + uint16_t length = 5; unsigned int j; for (j = 0;j<9;j++) { buffer[length++] = d[j]; @@ -75,7 +76,7 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, buffer[length++] = v; buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - buffer[length++] = ((MQTT_KEEPALIVE) & 0xff); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); length = writeString(id,buffer,length); if (willTopic) { length = writeString(willTopic,buffer,length); @@ -88,12 +89,13 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, length = writeString(pass,buffer,length); } } - - write(MQTTCONNECT,buffer,length); - lastOutActivity = millis(); - lastInActivity = millis(); + + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + while (!_client->available()) { - unsigned long t= millis(); + unsigned long t = millis(); if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { _client->stop(); return false; @@ -151,8 +153,9 @@ boolean PubSubClient::loop() { _client->stop(); return false; } else { - _client->write(MQTTPINGREQ); - _client->write((uint8_t)0); + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); lastOutActivity = t; lastInActivity = t; pingOutstanding = true; @@ -176,8 +179,9 @@ boolean PubSubClient::loop() { callback(topic,payload,len-4-tl); } } else if (type == MQTTPINGREQ) { - _client->write(MQTTPINGRESP); - _client->write((uint8_t)0); + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client->write(buffer,2); } else if (type == MQTTPINGRESP) { pingOutstanding = false; } @@ -198,7 +202,9 @@ boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plengt boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, boolean retained) { if (connected()) { - uint16_t length = writeString(topic,buffer,false); + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + length = writeString(topic,buffer,length); uint16_t i; for (i=0;i 0) { - digit |= 0x80; - } - lenBuf[pos++] = digit; - llen++; - } while(len>0); - - rc += _client->write(&header, 1); - rc += _client->write(lenBuf, llen); - - lenBuf[0] = tlen >> 8; // MSB - lenBuf[1] = tlen & 0xFF; // LSB - - rc += _client->write(lenBuf, 2); - rc += _client->write((uint8_t *)topic, tlen); - - for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); - } - - lastOutActivity = millis(); - return rc == len + 1 + plength; + + tlen = strlen(topic); + + header = MQTTPUBLISH; + if (retained) { + header |= 1; + } + buffer[pos++] = header; + len = plength + 2 + tlen; + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,buffer,pos); + + rc += _client->write(buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + return rc == len + 1 + plength; } boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { @@ -279,9 +279,12 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { llen++; } while(len>0); - rc = _client->write(header); - rc += _client->write(lenBuf,llen); - rc += _client->write(buf,length); + buf[4-llen] = header; + for (int i=0;iwrite(buf+(4-llen),length+1+llen); + lastOutActivity = millis(); return (rc == 1+llen+length); } @@ -289,26 +292,27 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { boolean PubSubClient::subscribe(char* topic) { if (connected()) { - uint16_t length = 2; + // Leave room in the buffer for header and variable length field + uint16_t length = 7; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } - buffer[0] = nextMsgId >> 8; - buffer[1] = nextMsgId - (buffer[0]<<8); + buffer[0] = (nextMsgId >> 8); + buffer[1] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); buffer[length++] = 0; // Only do QoS 0 subs - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length); + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); } return false; } void PubSubClient::disconnect() { - _client->write(MQTTDISCONNECT); - _client->write((uint8_t)0); + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client->write(buffer,2); _client->stop(); - lastInActivity = millis(); - lastOutActivity = millis(); + lastInActivity = lastOutActivity = millis(); } uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) { @@ -319,8 +323,8 @@ uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) { buf[pos++] = *idp++; i++; } - buf[pos-i-2] = 0; - buf[pos-i-1] = i; + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); return pos; }