Improved keepalive handling
This commit is contained in:
parent
fca33c7f5a
commit
fb5f8de2a4
@ -1,3 +1,6 @@
|
||||
1.7
|
||||
* Improved keepalive handling
|
||||
|
||||
1.6
|
||||
* Added the ability to publish a retained message
|
||||
|
||||
|
@ -41,11 +41,20 @@ int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t wi
|
||||
length = writeString(willMessage,buffer,length);
|
||||
}
|
||||
write(MQTTCONNECT,buffer,length);
|
||||
while (!_client.available()) {}
|
||||
lastOutActivity = millis();
|
||||
lastInActivity = millis();
|
||||
while (!_client.available()) {
|
||||
long t= millis();
|
||||
if (t-lastInActivity > KEEPALIVE) {
|
||||
_client.stop();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
uint8_t len = readPacket();
|
||||
|
||||
if (len == 4 && buffer[3] == 0) {
|
||||
lastActivity = millis();
|
||||
lastInActivity = millis();
|
||||
pingOutstanding = false;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
@ -88,14 +97,22 @@ uint8_t PubSubClient::readPacket() {
|
||||
int PubSubClient::loop() {
|
||||
if (connected()) {
|
||||
long t = millis();
|
||||
if (t - lastActivity > KEEPALIVE) {
|
||||
if ((t - lastInActivity > KEEPALIVE) || (t - lastOutActivity > KEEPALIVE)) {
|
||||
if (pingOutstanding) {
|
||||
_client.stop();
|
||||
return 0;
|
||||
} else {
|
||||
_client.write(MQTTPINGREQ);
|
||||
_client.write((uint8_t)0);
|
||||
lastActivity = t;
|
||||
lastOutActivity = t;
|
||||
lastInActivity = t;
|
||||
pingOutstanding = true;
|
||||
}
|
||||
}
|
||||
if (_client.available()) {
|
||||
uint8_t len = readPacket();
|
||||
if (len > 0) {
|
||||
lastInActivity = t;
|
||||
uint8_t type = buffer[0]&0xF0;
|
||||
if (type == MQTTPUBLISH) {
|
||||
if (callback) {
|
||||
@ -112,7 +129,8 @@ int PubSubClient::loop() {
|
||||
} else if (type == MQTTPINGREQ) {
|
||||
_client.write(MQTTPINGRESP);
|
||||
_client.write((uint8_t)0);
|
||||
lastActivity = t;
|
||||
} else if (type == MQTTPINGRESP) {
|
||||
pingOutstanding = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -151,6 +169,7 @@ int PubSubClient::write(uint8_t header, uint8_t* buf, uint8_t length) {
|
||||
_client.write(header);
|
||||
_client.write(length);
|
||||
_client.write(buf,length);
|
||||
lastOutActivity = millis();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -171,7 +190,8 @@ void PubSubClient::disconnect() {
|
||||
_client.write(MQTTDISCONNECT);
|
||||
_client.write((uint8_t)0);
|
||||
_client.stop();
|
||||
lastActivity = millis();
|
||||
lastInActivity = millis();
|
||||
lastOutActivity = millis();
|
||||
}
|
||||
|
||||
uint8_t PubSubClient::writeString(char* string, uint8_t* buf, uint8_t pos) {
|
||||
|
@ -35,7 +35,9 @@ private:
|
||||
Client _client;
|
||||
uint8_t buffer[MAX_PACKET_SIZE];
|
||||
uint8_t nextMsgId;
|
||||
long lastActivity;
|
||||
long lastOutActivity;
|
||||
long lastInActivity;
|
||||
bool pingOutstanding;
|
||||
void (*callback)(char*,uint8_t*,int);
|
||||
uint8_t readPacket();
|
||||
uint8_t readByte();
|
||||
|
Loading…
x
Reference in New Issue
Block a user