diff --git a/libraries/pubsubclient/PubSubClient.cpp b/libraries/pubsubclient/PubSubClient.cpp index 89d0f05..0fa420d 100644 --- a/libraries/pubsubclient/PubSubClient.cpp +++ b/libraries/pubsubclient/PubSubClient.cpp @@ -7,54 +7,30 @@ #include "PubSubClient.h" #include "Arduino.h" -#ifdef ESP8266 - #define INIT_FINGERPRINT() this->fingerprint = NULL; -#else - #define INIT_FINGERPRINT() -#endif - PubSubClient::PubSubClient() { this->_state = MQTT_DISCONNECTED; this->_client = NULL; this->stream = NULL; setCallback(NULL); - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; - this->_available = 0; - INIT_FINGERPRINT() } -#ifdef ESP8266 -PubSubClient::PubSubClient(WiFiClientSecure& client, const char* fingerprint) { - this->_state = MQTT_DISCONNECTED; - setClient(client); - this->stream = NULL; - this->_available = 0; - this->fingerprint = fingerprint; -} -#endif - PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { this->_state = MQTT_DISCONNECTED; setServer(addr, port); setClient(client); this->stream = NULL; - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(addr,port); setClient(client); setStream(stream); - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -62,8 +38,6 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); this->stream = NULL; - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -71,8 +45,6 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); setStream(stream); - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { @@ -80,16 +52,12 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { setServer(ip, port); setClient(client); this->stream = NULL; - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(ip,port); setClient(client); setStream(stream); - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -97,8 +65,6 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); this->stream = NULL; - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -106,8 +72,6 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); setStream(stream); - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { @@ -115,16 +79,12 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { setServer(domain,port); setClient(client); this->stream = NULL; - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; setServer(domain,port); setClient(client); setStream(stream); - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -132,8 +92,6 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); this->stream = NULL; - this->_available = 0; - INIT_FINGERPRINT() } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -141,23 +99,25 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); setStream(stream); - this->_available = 0; - INIT_FINGERPRINT() } boolean PubSubClient::connect(const char *id) { - return connect(id,NULL,NULL,0,0,0,0); + return connect(id,NULL,NULL,0,0,0,0,1); } boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { - return connect(id,user,pass,0,0,0,0); + return connect(id,user,pass,0,0,0,0,1); } boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1); } boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) { if (!connected()) { int result = 0; @@ -166,33 +126,10 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass } else { result = _client->connect(this->ip, this->port); } - -#ifdef ESP8266 - if (fingerprint != NULL) { - if (domain != NULL) { - // there's only one way to set fingerprint: using the WiFiClientSecure-based constructor, so this cast is safe - if (!static_cast(_client)->verify(fingerprint, domain)) { - _state = MQTT_TLS_BAD_SERVER_CREDENTIALS; - return false; - } - } - else { - char buffer[16]; // IPv4 only (which is what IPAddress supports anyway) - - ip.toString().toCharArray(buffer, 16); - - if (!static_cast(_client)->verify(fingerprint, buffer)) { - _state = MQTT_TLS_BAD_SERVER_CREDENTIALS; - return false; - } - } - } -#endif - if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; unsigned int j; #if MQTT_VERSION == MQTT_VERSION_3_1 @@ -208,9 +145,12 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass uint8_t v; if (willTopic) { - v = 0x06|(willQos<<3)|(willRetain<<5); + v = 0x04|(willQos<<3)|(willRetain<<5); } else { - v = 0x02; + v = 0x00; + } + if (cleanSession) { + v = v|0x02; } if(user != NULL) { @@ -225,24 +165,30 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass buffer[length++] = ((MQTT_KEEPALIVE) >> 8); buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + + CHECK_STRING_LENGTH(length,id) length = writeString(id,buffer,length); if (willTopic) { + CHECK_STRING_LENGTH(length,willTopic) length = writeString(willTopic,buffer,length); + CHECK_STRING_LENGTH(length,willMessage) length = writeString(willMessage,buffer,length); } if(user != NULL) { + CHECK_STRING_LENGTH(length,user) length = writeString(user,buffer,length); if(pass != NULL) { + CHECK_STRING_LENGTH(length,pass) length = writeString(pass,buffer,length); } } - write(MQTTCONNECT,buffer,length-5); + write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); - while (!available()) { + while (!_client->available()) { unsigned long t = millis(); if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { _state = MQTT_CONNECTION_TIMEOUT; @@ -272,26 +218,17 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass return true; } -// return and cache the available number of bytes in the client; -// remember to reduce the available count when consuming the buffer -int PubSubClient::available() { - if (_available == 0) { - _available = _client->available(); - } - return _available; -} - // reads a byte into result boolean PubSubClient::readByte(uint8_t * result) { uint32_t previousMillis = millis(); - while(!available()) { + while(!_client->available()) { + yield(); uint32_t currentMillis = millis(); if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ return false; } } *result = _client->read(); - _available -= 1; return true; } @@ -317,6 +254,12 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { uint8_t start = 0; do { + if (len == 5) { + // Invalid remaining length encoding - kill the connection + _state = MQTT_DISCONNECTED; + _client->stop(); + return 0; + } if(!readByte(&digit)) return 0; buffer[len++] = digit; length += (digit & 127) * multiplier; @@ -358,67 +301,65 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { boolean PubSubClient::loop() { if (connected()) { - do { - unsigned long t = millis(); - if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { - if (pingOutstanding) { - this->_state = MQTT_CONNECTION_TIMEOUT; - _client->stop(); - return false; - } else { - buffer[0] = MQTTPINGREQ; + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ + memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ + buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) buffer+llen+2; + // msgId only present for QOS>0 + if ((buffer[0]&0x06) == MQTTQOS1) { + msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; + payload = buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + + buffer[0] = MQTTPUBACK; + buffer[1] = 2; + buffer[2] = (msgId >> 8); + buffer[3] = (msgId & 0xFF); + _client->write(buffer,4); + lastOutActivity = t; + + } else { + payload = buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } + } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; buffer[1] = 0; _client->write(buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; } + } else if (!connected()) { + // readPacket has closed the connection + return false; } - - if (available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */ - memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) buffer+llen+2; - // msgId only present for QOS>0 - if ((buffer[0]&0x06) == MQTTQOS1) { - msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; - payload = buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - buffer[0] = MQTTPUBACK; - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - _client->write(buffer,4); - lastOutActivity = t; - - } else { - payload = buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); - } - } - } else if (type == MQTTPINGREQ) { - buffer[0] = MQTTPINGRESP; - buffer[1] = 0; - _client->write(buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; - } - } - } - } while (_available > 0); // can't leave data in the buffer, or subsequent publish() calls - // may fail (axTLS is only half-duplex, so writes will fail, to - // avoid losing information) + } return true; } return false; @@ -438,12 +379,12 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { if (connected()) { - if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) { + if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) { // Too long return false; } // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,buffer,length); uint16_t i; for (i=0;iwrite(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + lastOutActivity = millis(); + return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); + } + return false; +} + +int PubSubClient::endPublish() { + return 1; +} + +size_t PubSubClient::write(uint8_t data) { + lastOutActivity = millis(); + return _client->write(data); +} + +size_t PubSubClient::write(const uint8_t *buffer, size_t size) { + lastOutActivity = millis(); + return _client->write(buffer,size); +} + +size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) { uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; - uint16_t rc; uint16_t len = length; do { digit = len % 128; @@ -522,12 +498,18 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { buf[4-llen] = header; for (int i=0;i 0) && result) { @@ -539,9 +521,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { } return result; #else - rc = _client->write(buf+(4-llen),length+1+llen); + rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen); lastOutActivity = millis(); - return (rc == 1+llen+length); + return (rc == hlen+length); #endif } @@ -550,7 +532,7 @@ boolean PubSubClient::subscribe(const char* topic) { } boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { - if (qos < 0 || qos > 1) { + if (qos > 1) { return false; } if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { @@ -559,7 +541,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { } if (connected()) { // Leave room in the buffer for header and variable length field - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; @@ -568,7 +550,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { buffer[length++] = (nextMsgId & 0xFF); length = writeString((char*)topic, buffer,length); buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } @@ -579,7 +561,7 @@ boolean PubSubClient::unsubscribe(const char* topic) { return false; } if (connected()) { - uint16_t length = 5; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; @@ -587,7 +569,7 @@ boolean PubSubClient::unsubscribe(const char* topic) { buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId & 0xFF); length = writeString(topic, buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } @@ -597,6 +579,7 @@ void PubSubClient::disconnect() { buffer[1] = 0; _client->write(buffer,2); _state = MQTT_DISCONNECTED; + _client->flush(); _client->stop(); lastInActivity = lastOutActivity = millis(); } diff --git a/libraries/pubsubclient/PubSubClient.h b/libraries/pubsubclient/PubSubClient.h index 16bf8d3..2fd6f1d 100644 --- a/libraries/pubsubclient/PubSubClient.h +++ b/libraries/pubsubclient/PubSubClient.h @@ -12,10 +12,6 @@ #include "Client.h" #include "Stream.h" -#ifdef ESP8266 -#include "WiFiClientSecure.h" -#endif - #define MQTT_VERSION_3_1 3 #define MQTT_VERSION_3_1_1 4 @@ -46,17 +42,16 @@ //#define MQTT_MAX_TRANSFER_SIZE 80 // Possible values for client.state() -#define MQTT_TLS_BAD_SERVER_CREDENTIALS -5 -#define MQTT_CONNECTION_TIMEOUT -4 -#define MQTT_CONNECTION_LOST -3 -#define MQTT_CONNECT_FAILED -2 -#define MQTT_DISCONNECTED -1 -#define MQTT_CONNECTED 0 -#define MQTT_CONNECT_BAD_PROTOCOL 1 -#define MQTT_CONNECT_BAD_CLIENT_ID 2 -#define MQTT_CONNECT_UNAVAILABLE 3 -#define MQTT_CONNECT_BAD_CREDENTIALS 4 -#define MQTT_CONNECT_UNAUTHORIZED 5 +#define MQTT_CONNECTION_TIMEOUT -4 +#define MQTT_CONNECTION_LOST -3 +#define MQTT_CONNECT_FAILED -2 +#define MQTT_DISCONNECTED -1 +#define MQTT_CONNECTED 0 +#define MQTT_CONNECT_BAD_PROTOCOL 1 +#define MQTT_CONNECT_BAD_CLIENT_ID 2 +#define MQTT_CONNECT_UNAVAILABLE 3 +#define MQTT_CONNECT_BAD_CREDENTIALS 4 +#define MQTT_CONNECT_UNAUTHORIZED 5 #define MQTTCONNECT 1 << 4 // Client request to connect to Server #define MQTTCONNACK 2 << 4 // Connect Acknowledgment @@ -78,39 +73,42 @@ #define MQTTQOS1 (1 << 1) #define MQTTQOS2 (2 << 1) -#ifdef ESP8266 +// Maximum size of fixed header and variable length size header +#define MQTT_MAX_HEADER_SIZE 5 + +#if defined(ESP8266) || defined(ESP32) #include #define MQTT_CALLBACK_SIGNATURE std::function callback #else #define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) #endif -class PubSubClient { +#define CHECK_STRING_LENGTH(l,s) if (l+2+strlen(s) > MQTT_MAX_PACKET_SIZE) {_client->stop();return false;} + +class PubSubClient : public Print { private: Client* _client; uint8_t buffer[MQTT_MAX_PACKET_SIZE]; uint16_t nextMsgId; unsigned long lastOutActivity; unsigned long lastInActivity; - int _available; bool pingOutstanding; MQTT_CALLBACK_SIGNATURE; - int available(); uint16_t readPacket(uint8_t*); boolean readByte(uint8_t * result); boolean readByte(uint8_t * result, uint16_t * index); boolean write(uint8_t header, uint8_t* buf, uint16_t length); uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); + // Build up the header ready to send + // Returns the size of the header + // Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start + // (MQTT_MAX_HEADER_SIZE - ) bytes into the buffer + size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length); IPAddress ip; const char* domain; uint16_t port; Stream* stream; int _state; - -#ifdef ESP8266 - const char* fingerprint; -#endif - public: PubSubClient(); PubSubClient(Client& client); @@ -126,10 +124,6 @@ public: PubSubClient(const char*, uint16_t, Client& client, Stream&); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); - -#ifdef ESP8266 - PubSubClient(WiFiClientSecure& client, const char* fingerprint); -#endif PubSubClient& setServer(IPAddress ip, uint16_t port); PubSubClient& setServer(uint8_t * ip, uint16_t port); @@ -142,12 +136,31 @@ public: boolean connect(const char* id, const char* user, const char* pass); boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession); void disconnect(); boolean publish(const char* topic, const char* payload); boolean publish(const char* topic, const char* payload, boolean retained); boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const char* payload, boolean retained); boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + // Start to publish a message. + // This API: + // beginPublish(...) + // one or more calls to write(...) + // endPublish() + // Allows for arbitrarily large payloads to be sent without them having to be copied into + // a new buffer and held in memory at one time + // Returns 1 if the message was started successfully, 0 if there was an error + boolean beginPublish(const char* topic, unsigned int plength, boolean retained); + // Finish off this publish message (started with beginPublish) + // Returns 1 if the packet was sent successfully, 0 if there was an error + int endPublish(); + // Write a single byte of payload (only to be used with beginPublish/endPublish) + virtual size_t write(uint8_t); + // Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish) + // Returns the number of bytes written + virtual size_t write(const uint8_t *buffer, size_t size); boolean subscribe(const char* topic); boolean subscribe(const char* topic, uint8_t qos); boolean unsubscribe(const char* topic);