diff --git a/.gitignore b/.gitignore index 1c3ba18..a42cc40 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ tests/bin +.pioenvs +.piolibdeps +.clang_complete +.gcc-flags.json diff --git a/README.md b/README.md index 69cbb8f..33a3993 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Full API documentation is available here: https://pubsubclient.knolleary.net - It can only publish QoS 0 messages. It can subscribe at QoS 0 or QoS 1. - The maximum message size, including header, is **128 bytes** by default. This - is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h`. + is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h`. Longer messages can also be sent with the `publish_P()` method. - The keepalive interval is set to 15 seconds by default. This is configurable via `MQTT_KEEPALIVE` in `PubSubClient.h`. - The client uses MQTT 3.1.1 by default. It can be changed to use MQTT 3.1 by diff --git a/examples/mqtt_esp8266/mqtt_esp8266.ino b/examples/mqtt_esp8266/mqtt_esp8266.ino index e7357b5..7e6effd 100644 --- a/examples/mqtt_esp8266/mqtt_esp8266.ino +++ b/examples/mqtt_esp8266/mqtt_esp8266.ino @@ -1,26 +1,21 @@ /* Basic ESP8266 MQTT example - This sketch demonstrates the capabilities of the pubsub library in combination with the ESP8266 board/library. - It connects to an MQTT server then: - publishes "hello world" to the topic "outTopic" every two seconds - subscribes to the topic "inTopic", printing out any messages it receives. NB - it assumes the received payloads are strings not binary - If the first character of the topic "inTopic" is an 1, switch ON the ESP Led, else switch it off - It will reconnect to the server if the connection is lost using a blocking reconnect function. See the 'mqtt_reconnect_nonblocking' example for how to achieve the same result without blocking the main loop. - To install the ESP8266 board, (using Arduino 1.6.4+): - Add the following 3rd party board manager under "File -> Preferences -> Additional Boards Manager URLs": http://arduino.esp8266.com/stable/package_esp8266com_index.json - Open the "Tools -> Board -> Board Manager" and click install for the ESP8266" - Select your ESP8266 in "Tools -> Board" - */ #include @@ -34,8 +29,9 @@ const char* mqtt_server = "broker.mqtt-dashboard.com"; WiFiClient espClient; PubSubClient client(espClient); -long lastMsg = 0; -char msg[50]; +unsigned long lastMsg = 0; +#define MSG_BUFFER_SIZE (50) +char msg[MSG_BUFFER_SIZE]; int value = 0; void setup_wifi() { @@ -46,6 +42,7 @@ void setup_wifi() { Serial.print("Connecting to "); Serial.println(ssid); + WiFi.mode(WIFI_STA); WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { @@ -120,11 +117,11 @@ void loop() { } client.loop(); - long now = millis(); + unsigned long now = millis(); if (now - lastMsg > 2000) { lastMsg = now; ++value; - snprintf (msg, 50, "hello world #%ld", value); + snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld", value); Serial.print("Publish message: "); Serial.println(msg); client.publish("outTopic", msg); diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index b850c94..14d661e 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -1,4 +1,5 @@ /* + PubSubClient.cpp - A simple client for MQTT. Nick O'Leary http://knolleary.net @@ -121,11 +122,17 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (!connected()) { int result = 0; - if (domain != NULL) { - result = _client->connect(this->domain, this->port); + + if(_client->connected()) { + result = 1; } else { - result = _client->connect(this->ip, this->port); + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } } + if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field @@ -197,7 +204,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass } } uint8_t llen; - uint16_t len = readPacket(&llen); + uint32_t len = readPacket(&llen); if (len == 4) { if (buffer[3] == 0) { @@ -243,12 +250,12 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ return false; } -uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { +uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; if(!readByte(buffer, &len)) return 0; bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; - uint16_t length = 0; + uint32_t length = 0; uint8_t digit = 0; uint16_t skip = 0; uint8_t start = 0; @@ -263,7 +270,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { if(!readByte(&digit)) return 0; buffer[len++] = digit; length += (digit & 127) * multiplier; - multiplier *= 128; + multiplier <<=7; //multiplier *= 128 } while ((digit & 128) != 0); *lengthLength = len-1; @@ -279,20 +286,22 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { } } - for (uint16_t i = start;istream) { - if (isPublish && len-*lengthLength-2>skip) { + if (isPublish && idx-*lengthLength-2>skip) { this->stream->write(digit); } } if (len < MQTT_MAX_PACKET_SIZE) { buffer[len] = digit; + len++; } - len++; + idx++; } - if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { + if (!this->stream && idx > MQTT_MAX_PACKET_SIZE) { len = 0; // This will cause the packet to be ignored. } @@ -366,11 +375,11 @@ boolean PubSubClient::loop() { } boolean PubSubClient::publish(const char* topic, const char* payload) { - return publish(topic,(const uint8_t*)payload,strnlen(payload, MQTT_MAX_PACKET_SIZE),false); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, MQTT_MAX_PACKET_SIZE)) : 0,false); } boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { - return publish(topic,(const uint8_t*)payload,strnlen(payload, MQTT_MAX_PACKET_SIZE),retained); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, MQTT_MAX_PACKET_SIZE)) : 0,retained); } boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { @@ -386,10 +395,14 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne // Leave room in the buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,buffer,length); + + // Add payload uint16_t i; for (i=0;i>= 7; //len = len / 128 if (len > 0) { digit |= 0x80; } @@ -445,7 +459,9 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig lastOutActivity = millis(); - return rc == tlen + 4 + plength; + expectedLength = 1 + llen + 2 + tlen + plength; + + return (rc == expectedLength); } boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { @@ -486,8 +502,9 @@ size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) uint8_t pos = 0; uint16_t len = length; do { - digit = len % 128; - len = len / 128; + + digit = len & 127; //digit = len %128 + len >>= 7; //len = len / 128 if (len > 0) { digit |= 0x80; } @@ -531,10 +548,14 @@ boolean PubSubClient::subscribe(const char* topic) { } boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { + size_t topicLength = strnlen(topic, MQTT_MAX_PACKET_SIZE); + if (topic == 0) { + return false; + } if (qos > 1) { return false; } - if (MQTT_MAX_PACKET_SIZE < 9 + strnlen(topic, MQTT_MAX_PACKET_SIZE)) { + if (MQTT_MAX_PACKET_SIZE < 9 + topicLength) { // Too long return false; } @@ -555,7 +576,11 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { } boolean PubSubClient::unsubscribe(const char* topic) { - if (MQTT_MAX_PACKET_SIZE < 9 + strnlen(topic, MQTT_MAX_PACKET_SIZE)) { + size_t topicLength = strnlen(topic, MQTT_MAX_PACKET_SIZE); + if (topic == 0) { + return false; + } + if (MQTT_MAX_PACKET_SIZE < 9 + topicLength) { // Too long return false; } @@ -609,6 +634,8 @@ boolean PubSubClient::connected() { _client->flush(); _client->stop(); } + } else { + return this->_state == MQTT_CONNECTED; } } return rc; diff --git a/src/PubSubClient.h b/src/PubSubClient.h index 9a4b72f..619e40d 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -94,7 +94,7 @@ private: unsigned long lastInActivity; bool pingOutstanding; MQTT_CALLBACK_SIGNATURE; - uint16_t readPacket(uint8_t*); + uint32_t readPacket(uint8_t*); boolean readByte(uint8_t * result); boolean readByte(uint8_t * result, uint16_t * index); boolean write(uint8_t header, uint8_t* buf, uint16_t length); diff --git a/tests/src/receive_spec.cpp b/tests/src/receive_spec.cpp index 9a18af0..185f19b 100644 --- a/tests/src/receive_spec.cpp +++ b/tests/src/receive_spec.cpp @@ -190,7 +190,7 @@ int test_drop_invalid_remaining_length_message() { int test_receive_oversized_stream_message() { - IT("drops an oversized message"); + IT("receive an oversized streamed message"); reset_callback(); Stream stream; @@ -222,7 +222,7 @@ int test_receive_oversized_stream_message() { IS_TRUE(callback_called); IS_TRUE(strcmp(lastTopic,"topic")==0); - IS_TRUE(lastLength == length-9); + IS_TRUE(lastLength == MQTT_MAX_PACKET_SIZE-9); IS_FALSE(stream.error()); IS_FALSE(shimClient.error());