8 Commits
v2.3 ... 2.4

Author SHA1 Message Date
0bb4efcea5 Update for 2.4 2015-11-21 20:56:32 +00:00
31521085ea Increase rc of write to uint16 to match max possible length
Fixes #85
2015-11-21 20:36:21 +00:00
8a1d7fb620 Merge pull request #93 from vicatcu/master
Implement timeout behavior in readByte / readPacket
2015-11-21 20:20:35 +00:00
803f54b0bd changes to use #define MQTT_SOCKET_TIMEOUT instead of dynamic read timeout interval, per comments from @knolleary on https://github.com/knolleary/pubsubclient/issues/87 2015-11-19 13:58:23 -05:00
6f97ea04f2 minor cleanup 2015-11-19 09:23:40 -05:00
6bb06187b7 added optional timeout setting so that readByte can be escaped 2015-11-18 17:19:41 -05:00
efebd2e5e4 Merge pull request #82 from e-lin/master
Match the length of type for writing data
2015-10-04 19:35:29 +01:00
5cdadf43da Match the length of type for writing data
In MQTT_MAX_TRANSFER_SIZE case, the variable bytesRemaining needs to match the type of data length.
2015-10-04 22:16:27 +09:00
4 changed files with 41 additions and 13 deletions

View File

@ -1,3 +1,8 @@
2.4
* Add MQTT_SOCKET_TIMEOUT to prevent it blocking indefinitely
whilst waiting for inbound data
* Fixed return code when publishing >256 bytes
2.3
* Add publish(topic,payload,retained) function

View File

@ -1,5 +1,5 @@
name=PubSubClient
version=2.3
version=2.4
author=Nick O'Leary <nick.oleary@gmail.com>
maintainer=Nick O'Leary <nick.oleary@gmail.com>
sentence=A client library for MQTT messaging.

View File

@ -177,7 +177,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
while (!_client->available()) {
unsigned long t = millis();
if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
_state = MQTT_CONNECTION_TIMEOUT;
_client->stop();
return false;
@ -205,14 +205,33 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
return true;
}
uint8_t PubSubClient::readByte() {
while(!_client->available()) {}
return _client->read();
// reads a byte into result
boolean PubSubClient::readByte(uint8_t * result) {
uint32_t previousMillis = millis();
while(!_client->available()) {
uint32_t currentMillis = millis();
if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
return false;
}
}
*result = _client->read();
return true;
}
// reads a byte into result[*index] and increments index
boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
uint16_t current_index = *index;
uint8_t * write_address = &(result[current_index]);
if(readByte(write_address)){
*index = current_index + 1;
return true;
}
return false;
}
uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0;
buffer[len++] = readByte();
if(!readByte(buffer, &len)) return 0;
bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
uint16_t length = 0;
@ -221,7 +240,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint8_t start = 0;
do {
digit = readByte();
if(!readByte(&digit)) return 0;
buffer[len++] = digit;
length += (digit & 127) * multiplier;
multiplier *= 128;
@ -230,8 +249,8 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing
buffer[len++] = readByte();
buffer[len++] = readByte();
if(!readByte(buffer, &len)) return 0;
if(!readByte(buffer, &len)) return 0;
skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
start = 2;
if (buffer[0]&MQTTQOS1) {
@ -241,7 +260,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
}
for (uint16_t i = start;i<length;i++) {
digit = readByte();
if(!readByte(&digit)) return 0;
if (this->stream) {
if (isPublish && len-*lengthLength-2>skip) {
this->stream->write(digit);
@ -409,7 +428,7 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
uint8_t llen = 0;
uint8_t digit;
uint8_t pos = 0;
uint8_t rc;
uint16_t rc;
uint16_t len = length;
do {
digit = len % 128;
@ -428,7 +447,7 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf+(4-llen);
uint8_t bytesRemaining = length+1+llen;
uint16_t bytesRemaining = length+1+llen; //Match the length type
uint8_t bytesToWrite;
boolean result = true;
while((bytesRemaining > 0) && result) {

View File

@ -25,6 +25,9 @@
// MQTT_KEEPALIVE : keepAlive interval in Seconds
#define MQTT_KEEPALIVE 15
// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds
#define MQTT_SOCKET_TIMEOUT 15
// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client
// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to
// pass the entire MQTT packet in each write call.
@ -74,7 +77,8 @@ private:
bool pingOutstanding;
MQTT_CALLBACK_SIGNATURE;
uint16_t readPacket(uint8_t*);
uint8_t readByte();
boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
IPAddress ip;