From fb5f8de2a4a428a7f49c20a9687618b2fb944963 Mon Sep 17 00:00:00 2001 From: Nicholas O'Leary Date: Tue, 26 Apr 2011 20:52:57 +0100 Subject: [PATCH] Improved keepalive handling --- PubSubClient/CHANGES.txt | 3 +++ PubSubClient/PubSubClient.cpp | 36 +++++++++++++++++++++++++++-------- PubSubClient/PubSubClient.h | 4 +++- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/PubSubClient/CHANGES.txt b/PubSubClient/CHANGES.txt index 5b1d3dd..cb3acc3 100644 --- a/PubSubClient/CHANGES.txt +++ b/PubSubClient/CHANGES.txt @@ -1,3 +1,6 @@ +1.7 + * Improved keepalive handling + 1.6 * Added the ability to publish a retained message diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index 8dabbe6..1d47dd4 100644 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -41,11 +41,20 @@ int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t wi length = writeString(willMessage,buffer,length); } write(MQTTCONNECT,buffer,length); - while (!_client.available()) {} + lastOutActivity = millis(); + lastInActivity = millis(); + while (!_client.available()) { + long t= millis(); + if (t-lastInActivity > KEEPALIVE) { + _client.stop(); + return 0; + } + } uint8_t len = readPacket(); if (len == 4 && buffer[3] == 0) { - lastActivity = millis(); + lastInActivity = millis(); + pingOutstanding = false; return 1; } } @@ -88,14 +97,22 @@ uint8_t PubSubClient::readPacket() { int PubSubClient::loop() { if (connected()) { long t = millis(); - if (t - lastActivity > KEEPALIVE) { - _client.write(MQTTPINGREQ); - _client.write((uint8_t)0); - lastActivity = t; + if ((t - lastInActivity > KEEPALIVE) || (t - lastOutActivity > KEEPALIVE)) { + if (pingOutstanding) { + _client.stop(); + return 0; + } else { + _client.write(MQTTPINGREQ); + _client.write((uint8_t)0); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } } if (_client.available()) { uint8_t len = readPacket(); if (len > 0) { + lastInActivity = t; uint8_t type = buffer[0]&0xF0; if (type == MQTTPUBLISH) { if (callback) { @@ -112,7 +129,8 @@ int PubSubClient::loop() { } else if (type == MQTTPINGREQ) { _client.write(MQTTPINGRESP); _client.write((uint8_t)0); - lastActivity = t; + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; } } } @@ -151,6 +169,7 @@ int PubSubClient::write(uint8_t header, uint8_t* buf, uint8_t length) { _client.write(header); _client.write(length); _client.write(buf,length); + lastOutActivity = millis(); return 0; } @@ -171,7 +190,8 @@ void PubSubClient::disconnect() { _client.write(MQTTDISCONNECT); _client.write((uint8_t)0); _client.stop(); - lastActivity = millis(); + lastInActivity = millis(); + lastOutActivity = millis(); } uint8_t PubSubClient::writeString(char* string, uint8_t* buf, uint8_t pos) { diff --git a/PubSubClient/PubSubClient.h b/PubSubClient/PubSubClient.h index 613f91d..fc75a95 100644 --- a/PubSubClient/PubSubClient.h +++ b/PubSubClient/PubSubClient.h @@ -35,7 +35,9 @@ private: Client _client; uint8_t buffer[MAX_PACKET_SIZE]; uint8_t nextMsgId; - long lastActivity; + long lastOutActivity; + long lastInActivity; + bool pingOutstanding; void (*callback)(char*,uint8_t*,int); uint8_t readPacket(); uint8_t readByte();