Merge pull request #93 from vicatcu/master

Implement timeout behavior in readByte / readPacket
This commit is contained in:
Nick O'Leary 2015-11-21 20:20:35 +00:00
commit 8a1d7fb620
2 changed files with 33 additions and 10 deletions

View File

@ -177,7 +177,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
while (!_client->available()) { while (!_client->available()) {
unsigned long t = millis(); unsigned long t = millis();
if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
_state = MQTT_CONNECTION_TIMEOUT; _state = MQTT_CONNECTION_TIMEOUT;
_client->stop(); _client->stop();
return false; return false;
@ -205,14 +205,33 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
return true; return true;
} }
uint8_t PubSubClient::readByte() { // reads a byte into result
while(!_client->available()) {} boolean PubSubClient::readByte(uint8_t * result) {
return _client->read(); uint32_t previousMillis = millis();
while(!_client->available()) {
uint32_t currentMillis = millis();
if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
return false;
}
}
*result = _client->read();
return true;
}
// reads a byte into result[*index] and increments index
boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
uint16_t current_index = *index;
uint8_t * write_address = &(result[current_index]);
if(readByte(write_address)){
*index = current_index + 1;
return true;
}
return false;
} }
uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0; uint16_t len = 0;
buffer[len++] = readByte(); if(!readByte(buffer, &len)) return 0;
bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1; uint32_t multiplier = 1;
uint16_t length = 0; uint16_t length = 0;
@ -221,7 +240,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint8_t start = 0; uint8_t start = 0;
do { do {
digit = readByte(); if(!readByte(&digit)) return 0;
buffer[len++] = digit; buffer[len++] = digit;
length += (digit & 127) * multiplier; length += (digit & 127) * multiplier;
multiplier *= 128; multiplier *= 128;
@ -230,8 +249,8 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
if (isPublish) { if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing // Read in topic length to calculate bytes to skip over for Stream writing
buffer[len++] = readByte(); if(!readByte(buffer, &len)) return 0;
buffer[len++] = readByte(); if(!readByte(buffer, &len)) return 0;
skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
start = 2; start = 2;
if (buffer[0]&MQTTQOS1) { if (buffer[0]&MQTTQOS1) {
@ -241,7 +260,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
} }
for (uint16_t i = start;i<length;i++) { for (uint16_t i = start;i<length;i++) {
digit = readByte(); if(!readByte(&digit)) return 0;
if (this->stream) { if (this->stream) {
if (isPublish && len-*lengthLength-2>skip) { if (isPublish && len-*lengthLength-2>skip) {
this->stream->write(digit); this->stream->write(digit);

View File

@ -25,6 +25,9 @@
// MQTT_KEEPALIVE : keepAlive interval in Seconds // MQTT_KEEPALIVE : keepAlive interval in Seconds
#define MQTT_KEEPALIVE 15 #define MQTT_KEEPALIVE 15
// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds
#define MQTT_SOCKET_TIMEOUT 15
// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client // MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client
// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to // in each write call. Needed for the Arduino Wifi Shield. Leave undefined to
// pass the entire MQTT packet in each write call. // pass the entire MQTT packet in each write call.
@ -74,7 +77,8 @@ private:
bool pingOutstanding; bool pingOutstanding;
MQTT_CALLBACK_SIGNATURE; MQTT_CALLBACK_SIGNATURE;
uint16_t readPacket(uint8_t*); uint16_t readPacket(uint8_t*);
uint8_t readByte(); boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length); boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
IPAddress ip; IPAddress ip;