From b6239823fba189e7c1bd518dd8121a934c240a61 Mon Sep 17 00:00:00 2001 From: Guy Turcotte Date: Mon, 28 Jan 2019 18:02:37 -0500 Subject: [PATCH 1/5] Allow streaming of large messages These changes are required to allow for the transmission of large messages through a connected stream. The changes do not have an impact on the class interface and habitual behavior. In particular, it will enable the use of OTA through a stream hooked through the setStream() class method. I've designed such a stream to demonstrate the functionality: https://github.com/turgu1/mqtt_ota_example.git Guy --- src/PubSubClient.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 0fa420d..bd934b3 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -197,7 +197,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass } } uint8_t llen; - uint16_t len = readPacket(&llen); + uint32_t len = readPacket(&llen); if (len == 4) { if (buffer[3] == 0) { @@ -243,12 +243,12 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ return false; } -uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { +uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; if(!readByte(buffer, &len)) return 0; bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; - uint16_t length = 0; + uint32_t length = 0; uint8_t digit = 0; uint16_t skip = 0; uint8_t start = 0; @@ -279,20 +279,22 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { } } - for (uint16_t i = start;istream) { - if (isPublish && len-*lengthLength-2>skip) { + if (isPublish && idx-*lengthLength-2>skip) { this->stream->write(digit); } } if (len < MQTT_MAX_PACKET_SIZE) { buffer[len] = digit; + len++; } - len++; + idx++; } - if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { + if (!this->stream && idx > MQTT_MAX_PACKET_SIZE) { len = 0; // This will cause the packet to be ignored. } From 373c7d3569061af1ff6cc04564656a531ae21329 Mon Sep 17 00:00:00 2001 From: Guy Turcotte Date: Mon, 28 Jan 2019 18:47:18 -0500 Subject: [PATCH 2/5] Update PubSubClient.h --- src/PubSubClient.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/PubSubClient.h b/src/PubSubClient.h index 2fd6f1d..9fb8bcc 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -94,7 +94,7 @@ private: unsigned long lastInActivity; bool pingOutstanding; MQTT_CALLBACK_SIGNATURE; - uint16_t readPacket(uint8_t*); + uint32_t readPacket(uint8_t*); boolean readByte(uint8_t * result); boolean readByte(uint8_t * result, uint16_t * index); boolean write(uint8_t header, uint8_t* buf, uint16_t length); From e7d9688ca53361d4125ba5983844624b3d11f8e4 Mon Sep 17 00:00:00 2001 From: Guy Turcotte Date: Tue, 29 Jan 2019 08:43:30 -0500 Subject: [PATCH 3/5] Update receive_spec.cpp --- tests/src/receive_spec.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/src/receive_spec.cpp b/tests/src/receive_spec.cpp index 9a18af0..923ce25 100644 --- a/tests/src/receive_spec.cpp +++ b/tests/src/receive_spec.cpp @@ -190,7 +190,7 @@ int test_drop_invalid_remaining_length_message() { int test_receive_oversized_stream_message() { - IT("drops an oversized message"); + IT("receive an oversized streamed message"); reset_callback(); Stream stream; @@ -222,7 +222,7 @@ int test_receive_oversized_stream_message() { IS_TRUE(callback_called); IS_TRUE(strcmp(lastTopic,"topic")==0); - IS_TRUE(lastLength == length-9); + IS_TRUE(lastLength == 0); IS_FALSE(stream.error()); IS_FALSE(shimClient.error()); From 7d6e409b59c30f01b379313475657787692656b3 Mon Sep 17 00:00:00 2001 From: Guy Turcotte Date: Tue, 29 Jan 2019 08:59:35 -0500 Subject: [PATCH 4/5] Update receive_spec.cpp --- tests/src/receive_spec.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/receive_spec.cpp b/tests/src/receive_spec.cpp index 923ce25..199033a 100644 --- a/tests/src/receive_spec.cpp +++ b/tests/src/receive_spec.cpp @@ -222,7 +222,7 @@ int test_receive_oversized_stream_message() { IS_TRUE(callback_called); IS_TRUE(strcmp(lastTopic,"topic")==0); - IS_TRUE(lastLength == 0); + IS_TRUE(lastLength == MQTT_MAX_PACKET_SIZE); IS_FALSE(stream.error()); IS_FALSE(shimClient.error()); From b25040a0d7880457b370bb210496f5545e3ffbbf Mon Sep 17 00:00:00 2001 From: Guy Turcotte Date: Tue, 29 Jan 2019 09:05:26 -0500 Subject: [PATCH 5/5] Update receive_spec.cpp --- tests/src/receive_spec.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/receive_spec.cpp b/tests/src/receive_spec.cpp index 199033a..185f19b 100644 --- a/tests/src/receive_spec.cpp +++ b/tests/src/receive_spec.cpp @@ -222,7 +222,7 @@ int test_receive_oversized_stream_message() { IS_TRUE(callback_called); IS_TRUE(strcmp(lastTopic,"topic")==0); - IS_TRUE(lastLength == MQTT_MAX_PACKET_SIZE); + IS_TRUE(lastLength == MQTT_MAX_PACKET_SIZE-9); IS_FALSE(stream.error()); IS_FALSE(shimClient.error());