Add setSocketTimeout function

This commit is contained in:
Nick O'Leary 2020-05-20 01:33:31 +01:00
parent 77c4e445ea
commit 24510271ff
2 changed files with 24 additions and 4 deletions

View File

@ -16,6 +16,7 @@ PubSubClient::PubSubClient() {
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(Client& client) { PubSubClient::PubSubClient(Client& client) {
@ -25,6 +26,7 @@ PubSubClient::PubSubClient(Client& client) {
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
@ -35,6 +37,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -44,6 +47,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -54,6 +58,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -64,6 +69,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
@ -74,6 +80,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -83,6 +90,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& s
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -93,6 +101,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -103,6 +112,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
@ -113,6 +123,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -122,6 +133,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, St
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -132,6 +144,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -142,6 +155,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
this->bufferSize = 0; this->bufferSize = 0;
setBufferSize(MQTT_MAX_PACKET_SIZE); setBufferSize(MQTT_MAX_PACKET_SIZE);
setKeepAlive(MQTT_KEEPALIVE); setKeepAlive(MQTT_KEEPALIVE);
setSocketTimeout(MQTT_SOCKET_TIMEOUT);
} }
PubSubClient::~PubSubClient() { PubSubClient::~PubSubClient() {
@ -242,7 +256,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
while (!_client->available()) { while (!_client->available()) {
unsigned long t = millis(); unsigned long t = millis();
if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) {
_state = MQTT_CONNECTION_TIMEOUT; _state = MQTT_CONNECTION_TIMEOUT;
_client->stop(); _client->stop();
return false; return false;
@ -276,7 +290,7 @@ boolean PubSubClient::readByte(uint8_t * result) {
while(!_client->available()) { while(!_client->available()) {
yield(); yield();
uint32_t currentMillis = millis(); uint32_t currentMillis = millis();
if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){
return false; return false;
} }
} }
@ -749,3 +763,7 @@ PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
this->keepAlive = keepAlive; this->keepAlive = keepAlive;
return *this; return *this;
} }
PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) {
this->socketTimeout = timeout;
return *this;
}

View File

@ -26,12 +26,12 @@
#define MQTT_MAX_PACKET_SIZE 256 #define MQTT_MAX_PACKET_SIZE 256
#endif #endif
// MQTT_KEEPALIVE : keepAlive interval in Seconds // MQTT_KEEPALIVE : keepAlive interval in Seconds. Override with setKeepAlive()
#ifndef MQTT_KEEPALIVE #ifndef MQTT_KEEPALIVE
#define MQTT_KEEPALIVE 15 #define MQTT_KEEPALIVE 15
#endif #endif
// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds // MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds. Override with setSocketTimeout()
#ifndef MQTT_SOCKET_TIMEOUT #ifndef MQTT_SOCKET_TIMEOUT
#define MQTT_SOCKET_TIMEOUT 15 #define MQTT_SOCKET_TIMEOUT 15
#endif #endif
@ -91,6 +91,7 @@ private:
uint8_t* buffer; uint8_t* buffer;
uint16_t bufferSize; uint16_t bufferSize;
uint16_t keepAlive; uint16_t keepAlive;
uint16_t socketTimeout;
uint16_t nextMsgId; uint16_t nextMsgId;
unsigned long lastOutActivity; unsigned long lastOutActivity;
unsigned long lastInActivity; unsigned long lastInActivity;
@ -136,6 +137,7 @@ public:
PubSubClient& setClient(Client& client); PubSubClient& setClient(Client& client);
PubSubClient& setStream(Stream& stream); PubSubClient& setStream(Stream& stream);
PubSubClient& setKeepAlive(uint16_t keepAlive); PubSubClient& setKeepAlive(uint16_t keepAlive);
PubSubClient& setSocketTimeout(uint16_t timeout);
boolean setBufferSize(uint16_t size); boolean setBufferSize(uint16_t size);
uint16_t getBufferSize(); uint16_t getBufferSize();