Write each packet with a single call to _client->write()

This commit is contained in:
Nicholas O'Leary 2012-10-28 00:14:33 +01:00
parent 97e9614780
commit 569ada9937

View File

@ -51,7 +51,8 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic,
if (result) { if (result) {
nextMsgId = 1; nextMsgId = 1;
uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
uint8_t length = 0; // Leave room in the buffer for header and variable length field
uint16_t length = 5;
unsigned int j; unsigned int j;
for (j = 0;j<9;j++) { for (j = 0;j<9;j++) {
buffer[length++] = d[j]; buffer[length++] = d[j];
@ -75,7 +76,7 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic,
buffer[length++] = v; buffer[length++] = v;
buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
buffer[length++] = ((MQTT_KEEPALIVE) & 0xff); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
length = writeString(id,buffer,length); length = writeString(id,buffer,length);
if (willTopic) { if (willTopic) {
length = writeString(willTopic,buffer,length); length = writeString(willTopic,buffer,length);
@ -88,12 +89,13 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic,
length = writeString(pass,buffer,length); length = writeString(pass,buffer,length);
} }
} }
write(MQTTCONNECT,buffer,length); write(MQTTCONNECT,buffer,length-5);
lastOutActivity = millis();
lastInActivity = millis(); lastInActivity = lastOutActivity = millis();
while (!_client->available()) { while (!_client->available()) {
unsigned long t= millis(); unsigned long t = millis();
if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) {
_client->stop(); _client->stop();
return false; return false;
@ -151,8 +153,9 @@ boolean PubSubClient::loop() {
_client->stop(); _client->stop();
return false; return false;
} else { } else {
_client->write(MQTTPINGREQ); buffer[0] = MQTTPINGREQ;
_client->write((uint8_t)0); buffer[1] = 0;
_client->write(buffer,2);
lastOutActivity = t; lastOutActivity = t;
lastInActivity = t; lastInActivity = t;
pingOutstanding = true; pingOutstanding = true;
@ -176,8 +179,9 @@ boolean PubSubClient::loop() {
callback(topic,payload,len-4-tl); callback(topic,payload,len-4-tl);
} }
} else if (type == MQTTPINGREQ) { } else if (type == MQTTPINGREQ) {
_client->write(MQTTPINGRESP); buffer[0] = MQTTPINGRESP;
_client->write((uint8_t)0); buffer[1] = 0;
_client->write(buffer,2);
} else if (type == MQTTPINGRESP) { } else if (type == MQTTPINGRESP) {
pingOutstanding = false; pingOutstanding = false;
} }
@ -198,7 +202,9 @@ boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plengt
boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, boolean retained) { boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, boolean retained) {
if (connected()) { if (connected()) {
uint16_t length = writeString(topic,buffer,false); // Leave room in the buffer for header and variable length field
uint16_t length = 5;
length = writeString(topic,buffer,length);
uint16_t i; uint16_t i;
for (i=0;i<plength;i++) { for (i=0;i<plength;i++) {
buffer[length++] = payload[i]; buffer[length++] = payload[i];
@ -207,13 +213,12 @@ boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plengt
if (retained) { if (retained) {
header |= 1; header |= 1;
} }
return write(header,buffer,length); return write(header,buffer,length-5);
} }
return false; return false;
} }
boolean PubSubClient::publish_P(char* topic, uint8_t* PROGMEM payload, unsigned int plength, boolean retained) { boolean PubSubClient::publish_P(char* topic, uint8_t* PROGMEM payload, unsigned int plength, boolean retained) {
uint8_t lenBuf[4];
uint8_t llen = 0; uint8_t llen = 0;
uint8_t digit; uint8_t digit;
int rc; int rc;
@ -222,44 +227,39 @@ boolean PubSubClient::publish_P(char* topic, uint8_t* PROGMEM payload, unsigned
int i; int i;
uint8_t header; uint8_t header;
unsigned int len; unsigned int len;
if (!connected()) { if (!connected()) {
return false; return false;
} }
tlen = strlen(topic); tlen = strlen(topic);
header = MQTTPUBLISH; header = MQTTPUBLISH;
if (retained) { if (retained) {
header |= 1; header |= 1;
} }
buffer[pos++] = header;
len = plength + 2 + tlen; len = plength + 2 + tlen;
do { do {
digit = len % 128; digit = len % 128;
len = len / 128; len = len / 128;
if (len > 0) { if (len > 0) {
digit |= 0x80; digit |= 0x80;
} }
lenBuf[pos++] = digit; buffer[pos++] = digit;
llen++; llen++;
} while(len>0); } while(len>0);
rc += _client->write(&header, 1); pos = writeString(topic,buffer,pos);
rc += _client->write(lenBuf, llen);
rc += _client->write(buffer,pos);
lenBuf[0] = tlen >> 8; // MSB
lenBuf[1] = tlen & 0xFF; // LSB for (i=0;i<plength;i++) {
rc += _client->write((char)pgm_read_byte_near(payload + i));
rc += _client->write(lenBuf, 2); }
rc += _client->write((uint8_t *)topic, tlen);
lastOutActivity = millis();
for (i=0;i<plength;i++) { return rc == len + 1 + plength;
rc += _client->write((char)pgm_read_byte_near(payload + i));
}
lastOutActivity = millis();
return rc == len + 1 + plength;
} }
boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
@ -279,9 +279,12 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
llen++; llen++;
} while(len>0); } while(len>0);
rc = _client->write(header); buf[4-llen] = header;
rc += _client->write(lenBuf,llen); for (int i=0;i<llen;i++) {
rc += _client->write(buf,length); buf[5-llen+i] = lenBuf[i];
}
rc = _client->write(buf+(4-llen),length+1+llen);
lastOutActivity = millis(); lastOutActivity = millis();
return (rc == 1+llen+length); return (rc == 1+llen+length);
} }
@ -289,26 +292,27 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
boolean PubSubClient::subscribe(char* topic) { boolean PubSubClient::subscribe(char* topic) {
if (connected()) { if (connected()) {
uint16_t length = 2; // Leave room in the buffer for header and variable length field
uint16_t length = 7;
nextMsgId++; nextMsgId++;
if (nextMsgId == 0) { if (nextMsgId == 0) {
nextMsgId = 1; nextMsgId = 1;
} }
buffer[0] = nextMsgId >> 8; buffer[0] = (nextMsgId >> 8);
buffer[1] = nextMsgId - (buffer[0]<<8); buffer[1] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length); length = writeString(topic, buffer,length);
buffer[length++] = 0; // Only do QoS 0 subs buffer[length++] = 0; // Only do QoS 0 subs
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length); return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
} }
return false; return false;
} }
void PubSubClient::disconnect() { void PubSubClient::disconnect() {
_client->write(MQTTDISCONNECT); buffer[0] = MQTTDISCONNECT;
_client->write((uint8_t)0); buffer[1] = 0;
_client->write(buffer,2);
_client->stop(); _client->stop();
lastInActivity = millis(); lastInActivity = lastOutActivity = millis();
lastOutActivity = millis();
} }
uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) { uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
@ -319,8 +323,8 @@ uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
buf[pos++] = *idp++; buf[pos++] = *idp++;
i++; i++;
} }
buf[pos-i-2] = 0; buf[pos-i-2] = (i >> 8);
buf[pos-i-1] = i; buf[pos-i-1] = (i & 0xFF);
return pos; return pos;
} }