Merge pull request #568 from turgu1/patch-1

Allow streaming of large messages
This commit is contained in:
Nick O'Leary 2019-11-25 11:34:25 +00:00 committed by GitHub
commit cff1fc7bdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 10 deletions

View File

@ -204,7 +204,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
} }
} }
uint8_t llen; uint8_t llen;
uint16_t len = readPacket(&llen); uint32_t len = readPacket(&llen);
if (len == 4) { if (len == 4) {
if (buffer[3] == 0) { if (buffer[3] == 0) {
@ -250,12 +250,12 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
return false; return false;
} }
uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0; uint16_t len = 0;
if(!readByte(buffer, &len)) return 0; if(!readByte(buffer, &len)) return 0;
bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1; uint32_t multiplier = 1;
uint16_t length = 0; uint32_t length = 0;
uint8_t digit = 0; uint8_t digit = 0;
uint16_t skip = 0; uint16_t skip = 0;
uint8_t start = 0; uint8_t start = 0;
@ -286,20 +286,22 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
} }
} }
for (uint16_t i = start;i<length;i++) { uint32_t idx = len;
for (uint32_t i = start;i<length;i++) {
if(!readByte(&digit)) return 0; if(!readByte(&digit)) return 0;
if (this->stream) { if (this->stream) {
if (isPublish && len-*lengthLength-2>skip) { if (isPublish && idx-*lengthLength-2>skip) {
this->stream->write(digit); this->stream->write(digit);
} }
} }
if (len < MQTT_MAX_PACKET_SIZE) { if (len < MQTT_MAX_PACKET_SIZE) {
buffer[len] = digit; buffer[len] = digit;
}
len++; len++;
} }
idx++;
}
if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { if (!this->stream && idx > MQTT_MAX_PACKET_SIZE) {
len = 0; // This will cause the packet to be ignored. len = 0; // This will cause the packet to be ignored.
} }

View File

@ -94,7 +94,7 @@ private:
unsigned long lastInActivity; unsigned long lastInActivity;
bool pingOutstanding; bool pingOutstanding;
MQTT_CALLBACK_SIGNATURE; MQTT_CALLBACK_SIGNATURE;
uint16_t readPacket(uint8_t*); uint32_t readPacket(uint8_t*);
boolean readByte(uint8_t * result); boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index); boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length); boolean write(uint8_t header, uint8_t* buf, uint16_t length);

View File

@ -190,7 +190,7 @@ int test_drop_invalid_remaining_length_message() {
int test_receive_oversized_stream_message() { int test_receive_oversized_stream_message() {
IT("drops an oversized message"); IT("receive an oversized streamed message");
reset_callback(); reset_callback();
Stream stream; Stream stream;
@ -222,7 +222,7 @@ int test_receive_oversized_stream_message() {
IS_TRUE(callback_called); IS_TRUE(callback_called);
IS_TRUE(strcmp(lastTopic,"topic")==0); IS_TRUE(strcmp(lastTopic,"topic")==0);
IS_TRUE(lastLength == length-9); IS_TRUE(lastLength == MQTT_MAX_PACKET_SIZE-9);
IS_FALSE(stream.error()); IS_FALSE(stream.error());
IS_FALSE(shimClient.error()); IS_FALSE(shimClient.error());