From fa5b7f75d10e4e57cb2c8142fbae0d68947af513 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Fri, 28 Aug 2015 11:21:52 +0100 Subject: [PATCH] Tidy up more api and examples --- PubSubClient/PubSubClient.cpp | 495 ---------------- PubSubClient/examples/mqtt_auth/mqtt_auth.ino | 5 + .../examples/mqtt_basic/mqtt_basic.ino | 56 +- .../mqtt_reconnect_nonblocking.ino | 67 +++ PubSubClient/keywords.txt | 9 +- PubSubClient/library.properties | 9 + PubSubClient/src/PubSubClient.cpp | 538 ++++++++++++++++++ PubSubClient/{ => src}/PubSubClient.h | 28 +- 8 files changed, 696 insertions(+), 511 deletions(-) delete mode 100755 PubSubClient/PubSubClient.cpp create mode 100644 PubSubClient/examples/mqtt_reconnect_nonblocking/mqtt_reconnect_nonblocking.ino create mode 100644 PubSubClient/library.properties create mode 100755 PubSubClient/src/PubSubClient.cpp rename PubSubClient/{ => src}/PubSubClient.h (83%) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp deleted file mode 100755 index e5bf7cc..0000000 --- a/PubSubClient/PubSubClient.cpp +++ /dev/null @@ -1,495 +0,0 @@ -/* - PubSubClient.cpp - A simple client for MQTT. - Nicholas O'Leary - http://knolleary.net -*/ - -#include "PubSubClient.h" - -PubSubClient::PubSubClient() { - this->_client = NULL; - this->stream = NULL; - setCallback(NULL); -} - -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { - setServer(addr, port); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { - setServer(addr,port); - setClient(client); - setStream(stream); -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - setServer(addr, port); - setCallback(callback); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - setServer(addr,port); - setCallback(callback); - setClient(client); - setStream(stream); -} - - -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { - setServer(ip, port); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { - setServer(ip,port); - setClient(client); - setStream(stream); -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - setServer(ip, port); - setCallback(callback); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - setServer(ip,port); - setCallback(callback); - setClient(client); - setStream(stream); -} - -PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { - setServer(domain,port); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { - setServer(domain,port); - setClient(client); - setStream(stream); -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { - setServer(domain,port); - setCallback(callback); - setClient(client); - this->stream = NULL; -} -PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { - setServer(domain,port); - setCallback(callback); - setClient(client); - setStream(stream); -} - -boolean PubSubClient::connect(const char *id) { - return connect(id,NULL,NULL,0,0,0,0); -} - -boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { - return connect(id,user,pass,0,0,0,0); -} - -boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); -} - -boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { - if (!connected()) { - int result = 0; - - if (domain != NULL) { - result = _client->connect(this->domain, this->port); - } else { - result = _client->connect(this->ip, this->port); - } - if (result) { - nextMsgId = 1; - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - unsigned int j; - -#if MQTT_VERSION == MQTT_VERSION_3_1 - uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; -#define MQTT_HEADER_VERSION_LENGTH 9 -#elif MQTT_VERSION == MQTT_VERSION_3_1_1 - uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; -#define MQTT_HEADER_VERSION_LENGTH 7 -#endif - for (j = 0;j>1); - } - } - - buffer[length++] = v; - - buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); - length = writeString(id,buffer,length); - if (willTopic) { - length = writeString(willTopic,buffer,length); - length = writeString(willMessage,buffer,length); - } - - if(user != NULL) { - length = writeString(user,buffer,length); - if(pass != NULL) { - length = writeString(pass,buffer,length); - } - } - - write(MQTTCONNECT,buffer,length-5); - - lastInActivity = lastOutActivity = millis(); - - while (!_client->available()) { - unsigned long t = millis(); - if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { - _client->stop(); - return false; - } - } - uint8_t llen; - uint16_t len = readPacket(&llen); - - if (len == 4 && buffer[3] == 0) { - lastInActivity = millis(); - pingOutstanding = false; - return true; - } - } - _client->stop(); - } - return false; -} - -uint8_t PubSubClient::readByte() { - while(!_client->available()) {} - return _client->read(); -} - -uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { - uint16_t len = 0; - buffer[len++] = readByte(); - bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; - uint32_t multiplier = 1; - uint16_t length = 0; - uint8_t digit = 0; - uint16_t skip = 0; - uint8_t start = 0; - - do { - digit = readByte(); - buffer[len++] = digit; - length += (digit & 127) * multiplier; - multiplier *= 128; - } while ((digit & 128) != 0); - *lengthLength = len-1; - - if (isPublish) { - // Read in topic length to calculate bytes to skip over for Stream writing - buffer[len++] = readByte(); - buffer[len++] = readByte(); - skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; - start = 2; - if (buffer[0]&MQTTQOS1) { - // skip message id - skip += 2; - } - } - - for (uint16_t i = start;istream) { - if (isPublish && len-*lengthLength-2>skip) { - this->stream->write(digit); - } - } - if (len < MQTT_MAX_PACKET_SIZE) { - buffer[len] = digit; - } - len++; - } - - if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { - len = 0; // This will cause the packet to be ignored. - } - - return len; -} - -boolean PubSubClient::loop() { - if (connected()) { - unsigned long t = millis(); - if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { - if (pingOutstanding) { - _client->stop(); - return false; - } else { - buffer[0] = MQTTPINGREQ; - buffer[1] = 0; - _client->write(buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; - } - } - if (_client->available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; - char topic[tl+1]; - for (uint16_t i=0;i0 - if ((buffer[0]&0x06) == MQTTQOS1) { - msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; - payload = buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - buffer[0] = MQTTPUBACK; - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - _client->write(buffer,4); - lastOutActivity = t; - - } else { - payload = buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); - } - } - } else if (type == MQTTPINGREQ) { - buffer[0] = MQTTPINGRESP; - buffer[1] = 0; - _client->write(buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; - } - } - } - return true; - } - return false; -} - -boolean PubSubClient::publish(const char* topic, const char* payload) { - return publish(topic,(const uint8_t*)payload,strlen(payload),false); -} - -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { - return publish(topic, payload, plength, false); -} - -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { - if (connected()) { - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - length = writeString(topic,buffer,length); - uint16_t i; - for (i=0;i 0) { - digit |= 0x80; - } - buffer[pos++] = digit; - llen++; - } while(len>0); - - pos = writeString(topic,buffer,pos); - - rc += _client->write(buffer,pos); - - for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); - } - - lastOutActivity = millis(); - - return rc == tlen + 4 + plength; -} - -boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { - uint8_t lenBuf[4]; - uint8_t llen = 0; - uint8_t digit; - uint8_t pos = 0; - uint8_t rc; - uint16_t len = length; - do { - digit = len % 128; - len = len / 128; - if (len > 0) { - digit |= 0x80; - } - lenBuf[pos++] = digit; - llen++; - } while(len>0); - - buf[4-llen] = header; - for (int i=0;iwrite(buf+(4-llen),length+1+llen); - - lastOutActivity = millis(); - return (rc == 1+llen+length); -} - -boolean PubSubClient::subscribe(const char* topic) { - return subscribe(topic, 0); -} - -boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { - if (qos < 0 || qos > 1) - return false; - - if (connected()) { - // Leave room in the buffer for header and variable length field - uint16_t length = 5; - nextMsgId++; - if (nextMsgId == 0) { - nextMsgId = 1; - } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString((char*)topic, buffer,length); - buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); - } - return false; -} - -boolean PubSubClient::unsubscribe(const char* topic) { - if (connected()) { - uint16_t length = 5; - nextMsgId++; - if (nextMsgId == 0) { - nextMsgId = 1; - } - buffer[length++] = (nextMsgId >> 8); - buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); - } - return false; -} - -void PubSubClient::disconnect() { - buffer[0] = MQTTDISCONNECT; - buffer[1] = 0; - _client->write(buffer,2); - _client->stop(); - lastInActivity = lastOutActivity = millis(); -} - -uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { - const char* idp = string; - uint16_t i = 0; - pos += 2; - while (*idp) { - buf[pos++] = *idp++; - i++; - } - buf[pos-i-2] = (i >> 8); - buf[pos-i-1] = (i & 0xFF); - return pos; -} - - -boolean PubSubClient::connected() { - boolean rc; - if (_client == NULL ) { - rc = false; - } else { - rc = (int)_client->connected(); - if (!rc) _client->stop(); - } - return rc; -} - -void PubSubClient::setServer(uint8_t * ip, uint16_t port) { - IPAddress addr(ip[0],ip[1],ip[2],ip[3]); - setServer(addr,port); -} - -void PubSubClient::setServer(IPAddress ip, uint16_t port) { - this->ip = ip; - this->port = port; - this->domain = NULL; -} - -void PubSubClient::setServer(const char * domain, uint16_t port) { - this->domain = domain; - this->port = port; -} - -void PubSubClient::setCallback(void(*callback)(char*,uint8_t*,unsigned int)){ - this->callback = callback; -} - -void PubSubClient::setClient(Client& client){ - this->_client = &client; -} - -void PubSubClient::setStream(Stream& stream){ - this->stream = &stream; -} diff --git a/PubSubClient/examples/mqtt_auth/mqtt_auth.ino b/PubSubClient/examples/mqtt_auth/mqtt_auth.ino index f4c38f3..e9f7b18 100755 --- a/PubSubClient/examples/mqtt_auth/mqtt_auth.ino +++ b/PubSubClient/examples/mqtt_auth/mqtt_auth.ino @@ -26,6 +26,11 @@ PubSubClient client(server, 1883, callback, ethClient); void setup() { Ethernet.begin(mac, ip); + // Note - the default maximum packet size is 128 bytes. If the + // combined length of clientId, username and password exceed this, + // you will need to increase the value of MQTT_MAX_PACKET_SIZE in + // PubSubClient.h + if (client.connect("arduinoClient", "testuser", "testpass")) { client.publish("outTopic","hello world"); client.subscribe("inTopic"); diff --git a/PubSubClient/examples/mqtt_basic/mqtt_basic.ino b/PubSubClient/examples/mqtt_basic/mqtt_basic.ino index 88e77de..f545ade 100755 --- a/PubSubClient/examples/mqtt_basic/mqtt_basic.ino +++ b/PubSubClient/examples/mqtt_basic/mqtt_basic.ino @@ -1,9 +1,16 @@ /* Basic MQTT example - - connects to an MQTT server + This sketch demonstrates the basic capabilities of the library. + It connects to an MQTT server then: - publishes "hello world" to the topic "outTopic" - - subscribes to the topic "inTopic" + - subscribes to the topic "inTopic", printing out any messages + it receives. NB - it assumes the received payloads are strings not binary + + It will reconnect to the server if the connection is lost using a blocking + reconnect function. See the 'mqtt_reconnect_nonblocking' example for how to + achieve the same result without blocking the main loop. + */ #include @@ -16,22 +23,55 @@ IPAddress ip(172, 16, 0, 100); IPAddress server(172, 16, 0, 2); void callback(char* topic, byte* payload, unsigned int length) { - // handle message arrived + Serial.print("Message arrived ["); + Serial.print(topic); + Serial.print("] "); + for (int i=0;i +#include +#include + +// Update these with values suitable for your hardware/network. +byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED }; +IPAddress ip(172, 16, 0, 100); +IPAddress server(172, 16, 0, 2); + +void callback(char* topic, byte* payload, unsigned int length) { + // handle message arrived +} + +EthernetClient ethClient; +PubSubClient client(ethClient); + +long lastReconnectAttempt = 0; + +boolean reconnect() { + if (client.connect("arduinoClient")) { + // Once connected, publish an announcement... + client.publish("outTopic","hello world"); + // ... and resubscribe + client.subscribe("inTopic"); + } + return client.connected(); +} + +void setup() +{ + client.setServer(server, 1883); + client.setCallback(callback); + + Ethernet.begin(mac, ip); + delay(1500); + lastReconnectAttempt = 0; +} + + +void loop() +{ + if (!client.connected()) { + long now = millis(); + if (now - lastReconnectAttempt > 5000) { + lastReconnectAttempt = now; + // Attempt to reconnect + if (reconnect()) { + lastReconnectAttempt = 0; + } + } + } else { + // Client connected + + client.loop(); + } + +} diff --git a/PubSubClient/keywords.txt b/PubSubClient/keywords.txt index d91582e..b979588 100755 --- a/PubSubClient/keywords.txt +++ b/PubSubClient/keywords.txt @@ -1,5 +1,5 @@ ####################################### -# Syntax Coloring Map For Ultrasound +# Syntax Coloring Map For PubSubClient ####################################### ####################################### @@ -15,11 +15,16 @@ PubSubClient KEYWORD1 connect KEYWORD2 disconnect KEYWORD2 publish KEYWORD2 +publish_P KEYWORD2 subscribe KEYWORD2 +unsubscribe KEYWORD2 loop KEYWORD2 connected KEYWORD2 +setServer KEYWORD2 +setCallback KEYWORD2 +setClient KEYWORD2 +setStream KEYWORD2 ####################################### # Constants (LITERAL1) ####################################### - diff --git a/PubSubClient/library.properties b/PubSubClient/library.properties new file mode 100644 index 0000000..c4e031a --- /dev/null +++ b/PubSubClient/library.properties @@ -0,0 +1,9 @@ +name=PubSubClient +version=2.0 +author=Nick O'Leary +maintainer=Nick O'Leary +sentence=A client library for MQTT messaging. +paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages from a remote server. It supports the latest MQTT 3.1.1 protocol and can be configured to use the older MQTT 3.1 if needed. It supports all Arduino Ethernet Client compatible hardware, including the Intel Galileo/Edison and ESP8266. +category=Communication +url=http://knolleary.net/arduino-client-for-mqtt/ +architectures=* diff --git a/PubSubClient/src/PubSubClient.cpp b/PubSubClient/src/PubSubClient.cpp new file mode 100755 index 0000000..b3313a0 --- /dev/null +++ b/PubSubClient/src/PubSubClient.cpp @@ -0,0 +1,538 @@ +/* + PubSubClient.cpp - A simple client for MQTT. + Nick O'Leary + http://knolleary.net +*/ + +#include "PubSubClient.h" +#include "Arduino.h" + +PubSubClient::PubSubClient() { + this->_state = MQTT_DISCONNECTED; + this->_client = NULL; + this->stream = NULL; + setCallback(NULL); +} + +PubSubClient::PubSubClient(Client& client) { + this->_state = MQTT_DISCONNECTED; + setClient(client); + this->stream = NULL; +} + +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setCallback(callback); + setClient(client); + setStream(stream); +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setCallback(callback); + setClient(client); + setStream(stream); +} + +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + setStream(stream); +} + +boolean PubSubClient::connect(const char *id) { + return connect(id,NULL,NULL,0,0,0,0); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { + return connect(id,user,pass,0,0,0,0); +} + +boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + if (!connected()) { + int result = 0; + + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + if (result) { + nextMsgId = 1; + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + unsigned int j; + +#if MQTT_VERSION == MQTT_VERSION_3_1 + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 9 +#elif MQTT_VERSION == MQTT_VERSION_3_1_1 + uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 7 +#endif + for (j = 0;j>1); + } + } + + buffer[length++] = v; + + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } + + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } + } + + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + while (!_client->available()) { + unsigned long t = millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } + } + uint8_t llen; + uint16_t len = readPacket(&llen); + + if (len == 4) { + if (buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + _state = MQTT_CONNECTED; + return true; + } else { + _state = buffer[3]; + } + } + _client->stop(); + } else { + _state = MQTT_CONNECT_FAILED; + } + return false; + } +} + +uint8_t PubSubClient::readByte() { + while(!_client->available()) {} + return _client->read(); +} + +uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { + uint16_t len = 0; + buffer[len++] = readByte(); + bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint16_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint8_t start = 0; + + do { + digit = readByte(); + buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier *= 128; + } while ((digit & 128) != 0); + *lengthLength = len-1; + + if (isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing + buffer[len++] = readByte(); + buffer[len++] = readByte(); + skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; + start = 2; + if (buffer[0]&MQTTQOS1) { + // skip message id + skip += 2; + } + } + + for (uint16_t i = start;istream) { + if (isPublish && len-*lengthLength-2>skip) { + this->stream->write(digit); + } + } + if (len < MQTT_MAX_PACKET_SIZE) { + buffer[len] = digit; + } + len++; + } + + if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { + len = 0; // This will cause the packet to be ignored. + } + + return len; +} + +boolean PubSubClient::loop() { + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; + char topic[tl+1]; + for (uint16_t i=0;i0 + if ((buffer[0]&0x06) == MQTTQOS1) { + msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; + payload = buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + + buffer[0] = MQTTPUBACK; + buffer[1] = 2; + buffer[2] = (msgId >> 8); + buffer[3] = (msgId & 0xFF); + _client->write(buffer,4); + lastOutActivity = t; + + } else { + payload = buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } + } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client->write(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } + } + return true; + } + return false; +} + +boolean PubSubClient::publish(const char* topic, const char* payload) { + return publish(topic,(const uint8_t*)payload,strlen(payload),false); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { + return publish(topic, payload, plength, false); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + length = writeString(topic,buffer,length); + uint16_t i; + for (i=0;i 0) { + digit |= 0x80; + } + buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,buffer,pos); + + rc += _client->write(buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + + return rc == tlen + 4 + plength; +} + +boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint8_t rc; + uint16_t len = length; + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;iwrite(buf+(4-llen),length+1+llen); + + lastOutActivity = millis(); + return (rc == 1+llen+length); +} + +boolean PubSubClient::subscribe(const char* topic) { + return subscribe(topic, 0); +} + +boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { + if (qos < 0 || qos > 1) + return false; + + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, buffer,length); + buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +boolean PubSubClient::unsubscribe(const char* topic) { + if (connected()) { + uint16_t length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +void PubSubClient::disconnect() { + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client->write(buffer,2); + _state = MQTT_DISCONNECTED; + _client->stop(); + lastInActivity = lastOutActivity = millis(); +} + +uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { + const char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +boolean PubSubClient::connected() { + boolean rc; + if (_client == NULL ) { + rc = false; + } else { + rc = (int)_client->connected(); + if (!rc) { + if (this->_state == MQTT_CONNECTED) { + this->_state = MQTT_CONNECTION_LOST; + } + _client->stop(); + } + } + return rc; +} + +PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { + IPAddress addr(ip[0],ip[1],ip[2],ip[3]); + return setServer(addr,port); +} + +PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { + this->ip = ip; + this->port = port; + this->domain = NULL; + return *this; +} + +PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { + this->domain = domain; + this->port = port; + return *this; +} + +PubSubClient& PubSubClient::setCallback(void(*callback)(char*,uint8_t*,unsigned int)){ + this->callback = callback; + return *this; +} + +PubSubClient& PubSubClient::setClient(Client& client){ + this->_client = &client; + return *this; +} + +PubSubClient& PubSubClient::setStream(Stream& stream){ + this->stream = &stream; + return *this; +} + +int PubSubClient::state() { + return this->_state; +} diff --git a/PubSubClient/PubSubClient.h b/PubSubClient/src/PubSubClient.h similarity index 83% rename from PubSubClient/PubSubClient.h rename to PubSubClient/src/PubSubClient.h index 1f8a3ba..7b04462 100755 --- a/PubSubClient/PubSubClient.h +++ b/PubSubClient/src/PubSubClient.h @@ -44,6 +44,19 @@ #define MQTTQOS1 (1 << 1) #define MQTTQOS2 (2 << 1) +#define MQTT_CONNECTION_TIMEOUT -4 +#define MQTT_CONNECTION_LOST -3 +#define MQTT_CONNECT_FAILED -2 +#define MQTT_DISCONNECTED -1 +#define MQTT_CONNECTED 0 +#define MQTT_CONNECT_BAD_PROTOCOL 1 +#define MQTT_CONNECT_BAD_CLIENT_ID 2 +#define MQTT_CONNECT_UNAVAILABLE 3 +#define MQTT_CONNECT_BAD_CREDENTIALS 4 +#define MQTT_CONNECT_UNAUTHORIZED 5 + + + #define MQTT_CALLBACK_SIGNATURE void (*callback)(char*,uint8_t*,unsigned int) class PubSubClient { @@ -63,8 +76,10 @@ private: const char* domain; uint16_t port; Stream* stream; + int _state; public: PubSubClient(); + PubSubClient(Client& client); PubSubClient(IPAddress, uint16_t, Client& client); PubSubClient(IPAddress, uint16_t, Client& client, Stream&); PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); @@ -78,12 +93,12 @@ public: PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); - void setServer(IPAddress ip, uint16_t port); - void setServer(uint8_t * ip, uint16_t port); - void setServer(const char * domain, uint16_t port); - void setCallback(MQTT_CALLBACK_SIGNATURE); - void setClient(Client& client); - void setStream(Stream& stream); + PubSubClient& setServer(IPAddress ip, uint16_t port); + PubSubClient& setServer(uint8_t * ip, uint16_t port); + PubSubClient& setServer(const char * domain, uint16_t port); + PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); + PubSubClient& setClient(Client& client); + PubSubClient& setStream(Stream& stream); boolean connect(const char* id); boolean connect(const char* id, const char* user, const char* pass); @@ -99,6 +114,7 @@ public: boolean unsubscribe(const char* topic); boolean loop(); boolean connected(); + int state(); };