From 3b3a8da8d25f0d9c21e0699c1edf4e355d53f177 Mon Sep 17 00:00:00 2001 From: Adrian McEwen Date: Wed, 15 Aug 2018 17:15:04 +0100 Subject: [PATCH] Add large-payload API, make max header size a define, not magic number. --- keywords.txt | 3 ++ src/PubSubClient.cpp | 69 ++++++++++++++++++++++++++++++++++---------- src/PubSubClient.h | 25 ++++++++++++++++ 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/keywords.txt b/keywords.txt index b979588..1ee23d0 100755 --- a/keywords.txt +++ b/keywords.txt @@ -16,6 +16,9 @@ connect KEYWORD2 disconnect KEYWORD2 publish KEYWORD2 publish_P KEYWORD2 +beginPublish KEYWORD2 +endPublish KEYWORD2 +write KEYWORD2 subscribe KEYWORD2 unsubscribe KEYWORD2 loop KEYWORD2 diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 29fbbfa..f60d6a0 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -125,7 +125,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; unsigned int j; #if MQTT_VERSION == MQTT_VERSION_3_1 @@ -171,7 +171,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass } } - write(MQTTCONNECT,buffer,length-5); + write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); @@ -365,12 +365,12 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { if (connected()) { - if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) { + if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) { // Too long return false; } // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,buffer,length); uint16_t i; for (i=0;iwrite(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + lastOutActivity = millis(); + return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); + } + return false; +} + +int PubSubClient::endPublish() { + return 1; +} + +size_t PubSubClient::write(uint8_t data) { + lastOutActivity = millis(); + return _client->write(data); +} + +size_t PubSubClient::write(const uint8_t *buffer, size_t size) { + lastOutActivity = millis(); + return _client->write(buffer,size); +} + +size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) { uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; - uint16_t rc; uint16_t len = length; do { digit = len % 128; @@ -449,12 +480,18 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { buf[4-llen] = header; for (int i=0;i 0) && result) { @@ -466,9 +503,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { } return result; #else - rc = _client->write(buf+(4-llen),length+1+llen); + rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen); lastOutActivity = millis(); - return (rc == 1+llen+length); + return (rc == hlen+length); #endif } @@ -486,7 +523,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { } if (connected()) { // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; @@ -495,7 +532,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { buffer[length++] = (nextMsgId & 0xFF); length = writeString((char*)topic, buffer,length); buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } @@ -506,7 +543,7 @@ boolean PubSubClient::unsubscribe(const char* topic) { return false; } if (connected()) { - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; @@ -514,7 +551,7 @@ boolean PubSubClient::unsubscribe(const char* topic) { buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } diff --git a/src/PubSubClient.h b/src/PubSubClient.h index be4bd67..dc81ff3 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -73,6 +73,9 @@ #define MQTTQOS1 (1 << 1) #define MQTTQOS2 (2 << 1) +// Maximum size of fixed header and variable length size header +#define MQTT_MAX_HEADER_SIZE 5 + #ifdef ESP8266 #include #define MQTT_CALLBACK_SIGNATURE std::function callback @@ -94,6 +97,11 @@ private: boolean readByte(uint8_t * result, uint16_t * index); boolean write(uint8_t header, uint8_t* buf, uint16_t length); uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); + // Build up the header ready to send + // Returns the size of the header + // Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start + // (MQTT_MAX_HEADER_SIZE - ) bytes into the buffer + size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length); IPAddress ip; const char* domain; uint16_t port; @@ -132,6 +140,23 @@ public: boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + // Start to publish a message. + // This API: + // beginPublish(...) + // one or more calls to write(...) + // endPublish() + // Allows for arbitrarily large payloads to be sent without them having to be copied into + // a new buffer and held in memory at one time + // Returns 1 if the message was started successfully, 0 if there was an error + boolean beginPublish(const char* topic, unsigned int plength, boolean retained); + // Finish off this publish message (started with beginPublish) + // Returns 1 if the packet was sent successfully, 0 if there was an error + int endPublish(); + // Write a single byte of payload (only to be used with beginPublish/endPublish) + size_t write(uint8_t); + // Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish) + // Returns the number of bytes written + size_t write(const uint8_t *buffer, size_t size); boolean subscribe(const char* topic); boolean subscribe(const char* topic, uint8_t qos); boolean unsubscribe(const char* topic);