new version of pubsubclient

This commit is contained in:
Wolfgang Hottgenroth 2019-05-30 16:55:39 +02:00
parent 0f21b68845
commit d9426bdb45
Signed by: wn
GPG Key ID: B586EAFCDF2F65F4
2 changed files with 182 additions and 186 deletions

View File

@ -7,54 +7,30 @@
#include "PubSubClient.h" #include "PubSubClient.h"
#include "Arduino.h" #include "Arduino.h"
#ifdef ESP8266
#define INIT_FINGERPRINT() this->fingerprint = NULL;
#else
#define INIT_FINGERPRINT()
#endif
PubSubClient::PubSubClient() { PubSubClient::PubSubClient() {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
this->_client = NULL; this->_client = NULL;
this->stream = NULL; this->stream = NULL;
setCallback(NULL); setCallback(NULL);
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(Client& client) { PubSubClient::PubSubClient(Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
} }
#ifdef ESP8266
PubSubClient::PubSubClient(WiFiClientSecure& client, const char* fingerprint) {
this->_state = MQTT_DISCONNECTED;
setClient(client);
this->stream = NULL;
this->_available = 0;
this->fingerprint = fingerprint;
}
#endif
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setServer(addr, port); setServer(addr, port);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setServer(addr,port); setServer(addr,port);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -62,8 +38,6 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -71,8 +45,6 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
@ -80,16 +52,12 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
setServer(ip, port); setServer(ip, port);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setServer(ip,port); setServer(ip,port);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -97,8 +65,6 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -106,8 +72,6 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
@ -115,16 +79,12 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
setServer(domain,port); setServer(domain,port);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setServer(domain,port); setServer(domain,port);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -132,8 +92,6 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
this->_available = 0;
INIT_FINGERPRINT()
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -141,23 +99,25 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
this->_available = 0;
INIT_FINGERPRINT()
} }
boolean PubSubClient::connect(const char *id) { boolean PubSubClient::connect(const char *id) {
return connect(id,NULL,NULL,0,0,0,0); return connect(id,NULL,NULL,0,0,0,0,1);
} }
boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
return connect(id,user,pass,0,0,0,0); return connect(id,user,pass,0,0,0,0,1);
} }
boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
} }
boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
}
boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) {
if (!connected()) { if (!connected()) {
int result = 0; int result = 0;
@ -166,33 +126,10 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
} else { } else {
result = _client->connect(this->ip, this->port); result = _client->connect(this->ip, this->port);
} }
#ifdef ESP8266
if (fingerprint != NULL) {
if (domain != NULL) {
// there's only one way to set fingerprint: using the WiFiClientSecure-based constructor, so this cast is safe
if (!static_cast<WiFiClientSecure*>(_client)->verify(fingerprint, domain)) {
_state = MQTT_TLS_BAD_SERVER_CREDENTIALS;
return false;
}
}
else {
char buffer[16]; // IPv4 only (which is what IPAddress supports anyway)
ip.toString().toCharArray(buffer, 16);
if (!static_cast<WiFiClientSecure*>(_client)->verify(fingerprint, buffer)) {
_state = MQTT_TLS_BAD_SERVER_CREDENTIALS;
return false;
}
}
}
#endif
if (result == 1) { if (result == 1) {
nextMsgId = 1; nextMsgId = 1;
// Leave room in the buffer for header and variable length field // Leave room in the buffer for header and variable length field
uint16_t length = 5; uint16_t length = MQTT_MAX_HEADER_SIZE;
unsigned int j; unsigned int j;
#if MQTT_VERSION == MQTT_VERSION_3_1 #if MQTT_VERSION == MQTT_VERSION_3_1
@ -208,9 +145,12 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
uint8_t v; uint8_t v;
if (willTopic) { if (willTopic) {
v = 0x06|(willQos<<3)|(willRetain<<5); v = 0x04|(willQos<<3)|(willRetain<<5);
} else { } else {
v = 0x02; v = 0x00;
}
if (cleanSession) {
v = v|0x02;
} }
if(user != NULL) { if(user != NULL) {
@ -225,24 +165,30 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
CHECK_STRING_LENGTH(length,id)
length = writeString(id,buffer,length); length = writeString(id,buffer,length);
if (willTopic) { if (willTopic) {
CHECK_STRING_LENGTH(length,willTopic)
length = writeString(willTopic,buffer,length); length = writeString(willTopic,buffer,length);
CHECK_STRING_LENGTH(length,willMessage)
length = writeString(willMessage,buffer,length); length = writeString(willMessage,buffer,length);
} }
if(user != NULL) { if(user != NULL) {
CHECK_STRING_LENGTH(length,user)
length = writeString(user,buffer,length); length = writeString(user,buffer,length);
if(pass != NULL) { if(pass != NULL) {
CHECK_STRING_LENGTH(length,pass)
length = writeString(pass,buffer,length); length = writeString(pass,buffer,length);
} }
} }
write(MQTTCONNECT,buffer,length-5); write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE);
lastInActivity = lastOutActivity = millis(); lastInActivity = lastOutActivity = millis();
while (!available()) { while (!_client->available()) {
unsigned long t = millis(); unsigned long t = millis();
if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
_state = MQTT_CONNECTION_TIMEOUT; _state = MQTT_CONNECTION_TIMEOUT;
@ -272,26 +218,17 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
return true; return true;
} }
// return and cache the available number of bytes in the client;
// remember to reduce the available count when consuming the buffer
int PubSubClient::available() {
if (_available == 0) {
_available = _client->available();
}
return _available;
}
// reads a byte into result // reads a byte into result
boolean PubSubClient::readByte(uint8_t * result) { boolean PubSubClient::readByte(uint8_t * result) {
uint32_t previousMillis = millis(); uint32_t previousMillis = millis();
while(!available()) { while(!_client->available()) {
yield();
uint32_t currentMillis = millis(); uint32_t currentMillis = millis();
if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
return false; return false;
} }
} }
*result = _client->read(); *result = _client->read();
_available -= 1;
return true; return true;
} }
@ -317,6 +254,12 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint8_t start = 0; uint8_t start = 0;
do { do {
if (len == 5) {
// Invalid remaining length encoding - kill the connection
_state = MQTT_DISCONNECTED;
_client->stop();
return 0;
}
if(!readByte(&digit)) return 0; if(!readByte(&digit)) return 0;
buffer[len++] = digit; buffer[len++] = digit;
length += (digit & 127) * multiplier; length += (digit & 127) * multiplier;
@ -358,67 +301,65 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
boolean PubSubClient::loop() { boolean PubSubClient::loop() {
if (connected()) { if (connected()) {
do { unsigned long t = millis();
unsigned long t = millis(); if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { if (pingOutstanding) {
if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT;
this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop();
_client->stop(); return false;
return false; } else {
} else { buffer[0] = MQTTPINGREQ;
buffer[0] = MQTTPINGREQ; buffer[1] = 0;
_client->write(buffer,2);
lastOutActivity = t;
lastInActivity = t;
pingOutstanding = true;
}
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+llen+2;
// msgId only present for QOS>0
if ((buffer[0]&0x06) == MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
_client->write(buffer,4);
lastOutActivity = t;
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
} else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0; buffer[1] = 0;
_client->write(buffer,2); _client->write(buffer,2);
lastOutActivity = t; } else if (type == MQTTPINGRESP) {
lastInActivity = t; pingOutstanding = false;
pingOutstanding = true;
} }
} else if (!connected()) {
// readPacket has closed the connection
return false;
} }
}
if (available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+llen+2;
// msgId only present for QOS>0
if ((buffer[0]&0x06) == MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
_client->write(buffer,4);
lastOutActivity = t;
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
} else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
}
}
}
} while (_available > 0); // can't leave data in the buffer, or subsequent publish() calls
// may fail (axTLS is only half-duplex, so writes will fail, to
// avoid losing information)
return true; return true;
} }
return false; return false;
@ -438,12 +379,12 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
if (connected()) { if (connected()) {
if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) { if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) {
// Too long // Too long
return false; return false;
} }
// Leave room in the buffer for header and variable length field // Leave room in the buffer for header and variable length field
uint16_t length = 5; uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,buffer,length); length = writeString(topic,buffer,length);
uint16_t i; uint16_t i;
for (i=0;i<plength;i++) { for (i=0;i<plength;i++) {
@ -453,11 +394,15 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
if (retained) { if (retained) {
header |= 1; header |= 1;
} }
return write(header,buffer,length-5); return write(header,buffer,length-MQTT_MAX_HEADER_SIZE);
} }
return false; return false;
} }
boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
return publish_P(topic, (const uint8_t*)payload, strlen(payload), retained);
}
boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
uint8_t llen = 0; uint8_t llen = 0;
uint8_t digit; uint8_t digit;
@ -503,12 +448,43 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
return rc == tlen + 4 + plength; return rc == tlen + 4 + plength;
} }
boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
if (connected()) {
// Send the header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,buffer,length);
uint16_t i;
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
lastOutActivity = millis();
return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
}
return false;
}
int PubSubClient::endPublish() {
return 1;
}
size_t PubSubClient::write(uint8_t data) {
lastOutActivity = millis();
return _client->write(data);
}
size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
lastOutActivity = millis();
return _client->write(buffer,size);
}
size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
uint8_t lenBuf[4]; uint8_t lenBuf[4];
uint8_t llen = 0; uint8_t llen = 0;
uint8_t digit; uint8_t digit;
uint8_t pos = 0; uint8_t pos = 0;
uint16_t rc;
uint16_t len = length; uint16_t len = length;
do { do {
digit = len % 128; digit = len % 128;
@ -522,12 +498,18 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
buf[4-llen] = header; buf[4-llen] = header;
for (int i=0;i<llen;i++) { for (int i=0;i<llen;i++) {
buf[5-llen+i] = lenBuf[i]; buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
} }
return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
}
boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
uint16_t rc;
uint8_t hlen = buildHeader(header, buf, length);
#ifdef MQTT_MAX_TRANSFER_SIZE #ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf+(4-llen); uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
uint16_t bytesRemaining = length+1+llen; //Match the length type uint16_t bytesRemaining = length+hlen; //Match the length type
uint8_t bytesToWrite; uint8_t bytesToWrite;
boolean result = true; boolean result = true;
while((bytesRemaining > 0) && result) { while((bytesRemaining > 0) && result) {
@ -539,9 +521,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
} }
return result; return result;
#else #else
rc = _client->write(buf+(4-llen),length+1+llen); rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
lastOutActivity = millis(); lastOutActivity = millis();
return (rc == 1+llen+length); return (rc == hlen+length);
#endif #endif
} }
@ -550,7 +532,7 @@ boolean PubSubClient::subscribe(const char* topic) {
} }
boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
if (qos < 0 || qos > 1) { if (qos > 1) {
return false; return false;
} }
if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
@ -559,7 +541,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
} }
if (connected()) { if (connected()) {
// Leave room in the buffer for header and variable length field // Leave room in the buffer for header and variable length field
uint16_t length = 5; uint16_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++; nextMsgId++;
if (nextMsgId == 0) { if (nextMsgId == 0) {
nextMsgId = 1; nextMsgId = 1;
@ -568,7 +550,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
buffer[length++] = (nextMsgId & 0xFF); buffer[length++] = (nextMsgId & 0xFF);
length = writeString((char*)topic, buffer,length); length = writeString((char*)topic, buffer,length);
buffer[length++] = qos; buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
} }
return false; return false;
} }
@ -579,7 +561,7 @@ boolean PubSubClient::unsubscribe(const char* topic) {
return false; return false;
} }
if (connected()) { if (connected()) {
uint16_t length = 5; uint16_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++; nextMsgId++;
if (nextMsgId == 0) { if (nextMsgId == 0) {
nextMsgId = 1; nextMsgId = 1;
@ -587,7 +569,7 @@ boolean PubSubClient::unsubscribe(const char* topic) {
buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId >> 8);
buffer[length++] = (nextMsgId & 0xFF); buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length); length = writeString(topic, buffer,length);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
} }
return false; return false;
} }
@ -597,6 +579,7 @@ void PubSubClient::disconnect() {
buffer[1] = 0; buffer[1] = 0;
_client->write(buffer,2); _client->write(buffer,2);
_state = MQTT_DISCONNECTED; _state = MQTT_DISCONNECTED;
_client->flush();
_client->stop(); _client->stop();
lastInActivity = lastOutActivity = millis(); lastInActivity = lastOutActivity = millis();
} }

View File

@ -12,10 +12,6 @@
#include "Client.h" #include "Client.h"
#include "Stream.h" #include "Stream.h"
#ifdef ESP8266
#include "WiFiClientSecure.h"
#endif
#define MQTT_VERSION_3_1 3 #define MQTT_VERSION_3_1 3
#define MQTT_VERSION_3_1_1 4 #define MQTT_VERSION_3_1_1 4
@ -46,17 +42,16 @@
//#define MQTT_MAX_TRANSFER_SIZE 80 //#define MQTT_MAX_TRANSFER_SIZE 80
// Possible values for client.state() // Possible values for client.state()
#define MQTT_TLS_BAD_SERVER_CREDENTIALS -5 #define MQTT_CONNECTION_TIMEOUT -4
#define MQTT_CONNECTION_TIMEOUT -4 #define MQTT_CONNECTION_LOST -3
#define MQTT_CONNECTION_LOST -3 #define MQTT_CONNECT_FAILED -2
#define MQTT_CONNECT_FAILED -2 #define MQTT_DISCONNECTED -1
#define MQTT_DISCONNECTED -1 #define MQTT_CONNECTED 0
#define MQTT_CONNECTED 0 #define MQTT_CONNECT_BAD_PROTOCOL 1
#define MQTT_CONNECT_BAD_PROTOCOL 1 #define MQTT_CONNECT_BAD_CLIENT_ID 2
#define MQTT_CONNECT_BAD_CLIENT_ID 2 #define MQTT_CONNECT_UNAVAILABLE 3
#define MQTT_CONNECT_UNAVAILABLE 3 #define MQTT_CONNECT_BAD_CREDENTIALS 4
#define MQTT_CONNECT_BAD_CREDENTIALS 4 #define MQTT_CONNECT_UNAUTHORIZED 5
#define MQTT_CONNECT_UNAUTHORIZED 5
#define MQTTCONNECT 1 << 4 // Client request to connect to Server #define MQTTCONNECT 1 << 4 // Client request to connect to Server
#define MQTTCONNACK 2 << 4 // Connect Acknowledgment #define MQTTCONNACK 2 << 4 // Connect Acknowledgment
@ -78,39 +73,42 @@
#define MQTTQOS1 (1 << 1) #define MQTTQOS1 (1 << 1)
#define MQTTQOS2 (2 << 1) #define MQTTQOS2 (2 << 1)
#ifdef ESP8266 // Maximum size of fixed header and variable length size header
#define MQTT_MAX_HEADER_SIZE 5
#if defined(ESP8266) || defined(ESP32)
#include <functional> #include <functional>
#define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback #define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback
#else #else
#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) #define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int)
#endif #endif
class PubSubClient { #define CHECK_STRING_LENGTH(l,s) if (l+2+strlen(s) > MQTT_MAX_PACKET_SIZE) {_client->stop();return false;}
class PubSubClient : public Print {
private: private:
Client* _client; Client* _client;
uint8_t buffer[MQTT_MAX_PACKET_SIZE]; uint8_t buffer[MQTT_MAX_PACKET_SIZE];
uint16_t nextMsgId; uint16_t nextMsgId;
unsigned long lastOutActivity; unsigned long lastOutActivity;
unsigned long lastInActivity; unsigned long lastInActivity;
int _available;
bool pingOutstanding; bool pingOutstanding;
MQTT_CALLBACK_SIGNATURE; MQTT_CALLBACK_SIGNATURE;
int available();
uint16_t readPacket(uint8_t*); uint16_t readPacket(uint8_t*);
boolean readByte(uint8_t * result); boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index); boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length); boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
// Build up the header ready to send
// Returns the size of the header
// Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start
// (MQTT_MAX_HEADER_SIZE - <returned size>) bytes into the buffer
size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length);
IPAddress ip; IPAddress ip;
const char* domain; const char* domain;
uint16_t port; uint16_t port;
Stream* stream; Stream* stream;
int _state; int _state;
#ifdef ESP8266
const char* fingerprint;
#endif
public: public:
PubSubClient(); PubSubClient();
PubSubClient(Client& client); PubSubClient(Client& client);
@ -127,10 +125,6 @@ public:
PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client);
PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&);
#ifdef ESP8266
PubSubClient(WiFiClientSecure& client, const char* fingerprint);
#endif
PubSubClient& setServer(IPAddress ip, uint16_t port); PubSubClient& setServer(IPAddress ip, uint16_t port);
PubSubClient& setServer(uint8_t * ip, uint16_t port); PubSubClient& setServer(uint8_t * ip, uint16_t port);
PubSubClient& setServer(const char * domain, uint16_t port); PubSubClient& setServer(const char * domain, uint16_t port);
@ -142,12 +136,31 @@ public:
boolean connect(const char* id, const char* user, const char* pass); boolean connect(const char* id, const char* user, const char* pass);
boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage);
boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession);
void disconnect(); void disconnect();
boolean publish(const char* topic, const char* payload); boolean publish(const char* topic, const char* payload);
boolean publish(const char* topic, const char* payload, boolean retained); boolean publish(const char* topic, const char* payload, boolean retained);
boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); boolean publish(const char* topic, const uint8_t * payload, unsigned int plength);
boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained);
boolean publish_P(const char* topic, const char* payload, boolean retained);
boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained);
// Start to publish a message.
// This API:
// beginPublish(...)
// one or more calls to write(...)
// endPublish()
// Allows for arbitrarily large payloads to be sent without them having to be copied into
// a new buffer and held in memory at one time
// Returns 1 if the message was started successfully, 0 if there was an error
boolean beginPublish(const char* topic, unsigned int plength, boolean retained);
// Finish off this publish message (started with beginPublish)
// Returns 1 if the packet was sent successfully, 0 if there was an error
int endPublish();
// Write a single byte of payload (only to be used with beginPublish/endPublish)
virtual size_t write(uint8_t);
// Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish)
// Returns the number of bytes written
virtual size_t write(const uint8_t *buffer, size_t size);
boolean subscribe(const char* topic); boolean subscribe(const char* topic);
boolean subscribe(const char* topic, uint8_t qos); boolean subscribe(const char* topic, uint8_t qos);
boolean unsubscribe(const char* topic); boolean unsubscribe(const char* topic);