Add setKeepAlive function and tests

This commit is contained in:
Nick O'Leary 2020-05-20 01:18:02 +01:00
parent 64e981190b
commit 77c4e445ea
3 changed files with 57 additions and 3 deletions

View File

@ -15,6 +15,7 @@ PubSubClient::PubSubClient() {
setCallback(NULL); setCallback(NULL);
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(Client& client) { PubSubClient::PubSubClient(Client& client) {
@ -23,6 +24,7 @@ PubSubClient::PubSubClient(Client& client) {
this->stream = NULL; this->stream = NULL;
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
@ -32,6 +34,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
this->stream = NULL; this->stream = NULL;
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -40,6 +43,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream
setStream(stream); setStream(stream);
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -49,6 +53,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
this->stream = NULL; this->stream = NULL;
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -58,6 +63,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
setStream(stream); setStream(stream);
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
@ -67,6 +73,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
this->stream = NULL; this->stream = NULL;
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -75,6 +82,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& s
setStream(stream); setStream(stream);
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -84,6 +92,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
this->stream = NULL; this->stream = NULL;
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -93,6 +102,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
setStream(stream); setStream(stream);
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
@ -102,6 +112,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
this->stream = NULL; this->stream = NULL;
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -110,6 +121,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, St
setStream(stream); setStream(stream);
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -119,6 +131,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
this->stream = NULL; this->stream = NULL;
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -128,6 +141,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
setStream(stream); setStream(stream);
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE);
} }
PubSubClient::~PubSubClient() { PubSubClient::~PubSubClient() {
@ -201,8 +215,8 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
} }
this->buffer[length++] = v; this->buffer[length++] = v;
this->buffer[length++] = ((MQTT_KEEPALIVE) >> 8); this->buffer[length++] = ((this->keepAlive) >> 8);
this->buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); this->buffer[length++] = ((this->keepAlive) & 0xFF);
CHECK_STRING_LENGTH(length,id) CHECK_STRING_LENGTH(length,id)
length = writeString(id,this->buffer,length); length = writeString(id,this->buffer,length);
@ -342,7 +356,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
boolean PubSubClient::loop() { boolean PubSubClient::loop() {
if (connected()) { if (connected()) {
unsigned long t = millis(); unsigned long t = millis();
if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
if (pingOutstanding) { if (pingOutstanding) {
this->_state = MQTT_CONNECTION_TIMEOUT; this->_state = MQTT_CONNECTION_TIMEOUT;
_client->stop(); _client->stop();
@ -731,3 +745,7 @@ boolean PubSubClient::setBufferSize(uint16_t size) {
uint16_t PubSubClient::getBufferSize() { uint16_t PubSubClient::getBufferSize() {
return this->bufferSize; return this->bufferSize;
} }
PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
this->keepAlive = keepAlive;
return *this;
}

View File

@ -90,6 +90,7 @@ private:
Client* _client; Client* _client;
uint8_t* buffer; uint8_t* buffer;
uint16_t bufferSize; uint16_t bufferSize;
uint16_t keepAlive;
uint16_t nextMsgId; uint16_t nextMsgId;
unsigned long lastOutActivity; unsigned long lastOutActivity;
unsigned long lastInActivity; unsigned long lastInActivity;
@ -134,6 +135,7 @@ public:
PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE);
PubSubClient& setClient(Client& client); PubSubClient& setClient(Client& client);
PubSubClient& setStream(Stream& stream); PubSubClient& setStream(Stream& stream);
PubSubClient& setKeepAlive(uint16_t keepAlive);
boolean setBufferSize(uint16_t size); boolean setBufferSize(uint16_t size);
uint16_t getBufferSize(); uint16_t getBufferSize();

View File

@ -280,6 +280,38 @@ int test_connect_disconnect_connect() {
END_IT END_IT
} }
int test_connect_custom_keepalive() {
IT("sends a properly formatted connect packet with custom keepalive value");
ShimClient shimClient;
shimClient.setAllowConnect(true);
byte expectServer[] = { 172, 16, 0, 2 };
shimClient.expectConnect(expectServer,1883);
// Set keepalive to 300secs == 0x01 0x2c
byte connect[] = {0x10,0x18,0x0,0x4,0x4d,0x51,0x54,0x54,0x4,0x2,0x01,0x2c,0x0,0xc,0x63,0x6c,0x69,0x65,0x6e,0x74,0x5f,0x74,0x65,0x73,0x74,0x31};
byte connack[] = { 0x20, 0x02, 0x00, 0x00 };
shimClient.expect(connect,26);
shimClient.respond(connack,4);
PubSubClient client(server, 1883, callback, shimClient);
int state = client.state();
IS_TRUE(state == MQTT_DISCONNECTED);
client.setKeepAlive(300);
int rc = client.connect((char*)"client_test1");
IS_TRUE(rc);
IS_FALSE(shimClient.error());
state = client.state();
IS_TRUE(state == MQTT_CONNECTED);
END_IT
}
int main() int main()
{ {
SUITE("Connect"); SUITE("Connect");
@ -298,5 +330,7 @@ int main()
test_connect_with_will(); test_connect_with_will();
test_connect_with_will_username_password(); test_connect_with_will_username_password();
test_connect_disconnect_connect(); test_connect_disconnect_connect();
test_connect_custom_keepalive();
FINISH FINISH
} }