Add large-payload API, make max header size a define, not magic number.

This commit is contained in:
Adrian McEwen
2018-08-15 17:15:04 +01:00
parent 54be6e87db
commit 3b3a8da8d2
3 changed files with 81 additions and 16 deletions

View File

@ -125,7 +125,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
if (result == 1) {
nextMsgId = 1;
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
unsigned int j;
#if MQTT_VERSION == MQTT_VERSION_3_1
@ -171,7 +171,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
}
}
write(MQTTCONNECT,buffer,length-5);
write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE);
lastInActivity = lastOutActivity = millis();
@ -365,12 +365,12 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
if (connected()) {
if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) {
if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) {
// Too long
return false;
}
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,buffer,length);
uint16_t i;
for (i=0;i<plength;i++) {
@ -380,7 +380,7 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
if (retained) {
header |= 1;
}
return write(header,buffer,length-5);
return write(header,buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}
@ -430,12 +430,43 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
return rc == tlen + 4 + plength;
}
boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
if (connected()) {
// Send the header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,buffer,length);
uint16_t i;
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
lastOutActivity = millis();
return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
}
return false;
}
int PubSubClient::endPublish() {
return 1;
}
size_t PubSubClient::write(uint8_t data) {
lastOutActivity = millis();
return _client->write(data);
}
size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
lastOutActivity = millis();
return _client->write(buffer,size);
}
size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
uint8_t lenBuf[4];
uint8_t llen = 0;
uint8_t digit;
uint8_t pos = 0;
uint16_t rc;
uint16_t len = length;
do {
digit = len % 128;
@ -449,12 +480,18 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
buf[4-llen] = header;
for (int i=0;i<llen;i++) {
buf[5-llen+i] = lenBuf[i];
buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
}
return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
}
boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
uint16_t rc;
uint8_t hlen = buildHeader(header, buf, length);
#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf+(4-llen);
uint16_t bytesRemaining = length+1+llen; //Match the length type
uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
uint16_t bytesRemaining = length+hlen; //Match the length type
uint8_t bytesToWrite;
boolean result = true;
while((bytesRemaining > 0) && result) {
@ -466,9 +503,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
}
return result;
#else
rc = _client->write(buf+(4-llen),length+1+llen);
rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
lastOutActivity = millis();
return (rc == 1+llen+length);
return (rc == hlen+length);
#endif
}
@ -486,7 +523,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
}
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
@ -495,7 +532,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
buffer[length++] = (nextMsgId & 0xFF);
length = writeString((char*)topic, buffer,length);
buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}
@ -506,7 +543,7 @@ boolean PubSubClient::unsubscribe(const char* topic) {
return false;
}
if (connected()) {
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
@ -514,7 +551,7 @@ boolean PubSubClient::unsubscribe(const char* topic) {
buffer[length++] = (nextMsgId >> 8);
buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}