Merge branch 'master' into strnlen-patch-1

This commit is contained in:
Nick O'Leary 2020-05-09 22:29:59 +01:00 committed by GitHub
commit 98098ede85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 34 deletions

4
.gitignore vendored
View File

@ -1 +1,5 @@
tests/bin
.pioenvs
.piolibdeps
.clang_complete
.gcc-flags.json

View File

@ -14,7 +14,7 @@ Full API documentation is available here: https://pubsubclient.knolleary.net
- It can only publish QoS 0 messages. It can subscribe at QoS 0 or QoS 1.
- The maximum message size, including header, is **128 bytes** by default. This
is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h`.
is configurable via `MQTT_MAX_PACKET_SIZE` in `PubSubClient.h`. Longer messages can also be sent with the `publish_P()` method.
- The keepalive interval is set to 15 seconds by default. This is configurable
via `MQTT_KEEPALIVE` in `PubSubClient.h`.
- The client uses MQTT 3.1.1 by default. It can be changed to use MQTT 3.1 by

View File

@ -1,26 +1,21 @@
/*
Basic ESP8266 MQTT example
This sketch demonstrates the capabilities of the pubsub library in combination
with the ESP8266 board/library.
It connects to an MQTT server then:
- publishes "hello world" to the topic "outTopic" every two seconds
- subscribes to the topic "inTopic", printing out any messages
it receives. NB - it assumes the received payloads are strings not binary
- If the first character of the topic "inTopic" is an 1, switch ON the ESP Led,
else switch it off
It will reconnect to the server if the connection is lost using a blocking
reconnect function. See the 'mqtt_reconnect_nonblocking' example for how to
achieve the same result without blocking the main loop.
To install the ESP8266 board, (using Arduino 1.6.4+):
- Add the following 3rd party board manager under "File -> Preferences -> Additional Boards Manager URLs":
http://arduino.esp8266.com/stable/package_esp8266com_index.json
- Open the "Tools -> Board -> Board Manager" and click install for the ESP8266"
- Select your ESP8266 in "Tools -> Board"
*/
#include <ESP8266WiFi.h>
@ -34,8 +29,9 @@ const char* mqtt_server = "broker.mqtt-dashboard.com";
WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
char msg[50];
unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
void setup_wifi() {
@ -46,6 +42,7 @@ void setup_wifi() {
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
@ -120,11 +117,11 @@ void loop() {
}
client.loop();
long now = millis();
unsigned long now = millis();
if (now - lastMsg > 2000) {
lastMsg = now;
++value;
snprintf (msg, 50, "hello world #%ld", value);
snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld", value);
Serial.print("Publish message: ");
Serial.println(msg);
client.publish("outTopic", msg);

View File

@ -1,4 +1,5 @@
/*
PubSubClient.cpp - A simple client for MQTT.
Nick O'Leary
http://knolleary.net
@ -121,11 +122,17 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
if (!connected()) {
int result = 0;
if (domain != NULL) {
result = _client->connect(this->domain, this->port);
if(_client->connected()) {
result = 1;
} else {
result = _client->connect(this->ip, this->port);
if (domain != NULL) {
result = _client->connect(this->domain, this->port);
} else {
result = _client->connect(this->ip, this->port);
}
}
if (result == 1) {
nextMsgId = 1;
// Leave room in the buffer for header and variable length field
@ -197,7 +204,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
}
}
uint8_t llen;
uint16_t len = readPacket(&llen);
uint32_t len = readPacket(&llen);
if (len == 4) {
if (buffer[3] == 0) {
@ -243,12 +250,12 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
return false;
}
uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0;
if(!readByte(buffer, &len)) return 0;
bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
uint16_t length = 0;
uint32_t length = 0;
uint8_t digit = 0;
uint16_t skip = 0;
uint8_t start = 0;
@ -263,7 +270,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
if(!readByte(&digit)) return 0;
buffer[len++] = digit;
length += (digit & 127) * multiplier;
multiplier *= 128;
multiplier <<=7; //multiplier *= 128
} while ((digit & 128) != 0);
*lengthLength = len-1;
@ -279,20 +286,22 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
}
}
for (uint16_t i = start;i<length;i++) {
uint32_t idx = len;
for (uint32_t i = start;i<length;i++) {
if(!readByte(&digit)) return 0;
if (this->stream) {
if (isPublish && len-*lengthLength-2>skip) {
if (isPublish && idx-*lengthLength-2>skip) {
this->stream->write(digit);
}
}
if (len < MQTT_MAX_PACKET_SIZE) {
buffer[len] = digit;
len++;
}
len++;
idx++;
}
if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
if (!this->stream && idx > MQTT_MAX_PACKET_SIZE) {
len = 0; // This will cause the packet to be ignored.
}
@ -366,11 +375,11 @@ boolean PubSubClient::loop() {
}
boolean PubSubClient::publish(const char* topic, const char* payload) {
return publish(topic,(const uint8_t*)payload,strnlen(payload, MQTT_MAX_PACKET_SIZE),false);
return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, MQTT_MAX_PACKET_SIZE)) : 0,false);
}
boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
return publish(topic,(const uint8_t*)payload,strnlen(payload, MQTT_MAX_PACKET_SIZE),retained);
return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, MQTT_MAX_PACKET_SIZE)) : 0,retained);
}
boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
@ -386,10 +395,14 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
// Leave room in the buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,buffer,length);
// Add payload
uint16_t i;
for (i=0;i<plength;i++) {
buffer[length++] = payload[i];
}
// Write the header
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
@ -400,7 +413,7 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
}
boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
return publish_P(topic, (const uint8_t*)payload, strnlen(payload, MQTT_MAX_PACKET_SIZE), retained);
return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, MQTT_MAX_PACKET_SIZE) : 0, retained);
}
boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
@ -412,6 +425,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
unsigned int i;
uint8_t header;
unsigned int len;
int expectedLength;
if (!connected()) {
return false;
@ -426,8 +440,8 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
buffer[pos++] = header;
len = plength + 2 + tlen;
do {
digit = len % 128;
len = len / 128;
digit = len & 127; //digit = len %128
len >>= 7; //len = len / 128
if (len > 0) {
digit |= 0x80;
}
@ -445,7 +459,9 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
lastOutActivity = millis();
return rc == tlen + 4 + plength;
expectedLength = 1 + llen + 2 + tlen + plength;
return (rc == expectedLength);
}
boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
@ -486,8 +502,9 @@ size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length)
uint8_t pos = 0;
uint16_t len = length;
do {
digit = len % 128;
len = len / 128;
digit = len & 127; //digit = len %128
len >>= 7; //len = len / 128
if (len > 0) {
digit |= 0x80;
}
@ -531,10 +548,14 @@ boolean PubSubClient::subscribe(const char* topic) {
}
boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
size_t topicLength = strnlen(topic, MQTT_MAX_PACKET_SIZE);
if (topic == 0) {
return false;
}
if (qos > 1) {
return false;
}
if (MQTT_MAX_PACKET_SIZE < 9 + strnlen(topic, MQTT_MAX_PACKET_SIZE)) {
if (MQTT_MAX_PACKET_SIZE < 9 + topicLength) {
// Too long
return false;
}
@ -555,7 +576,11 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
}
boolean PubSubClient::unsubscribe(const char* topic) {
if (MQTT_MAX_PACKET_SIZE < 9 + strnlen(topic, MQTT_MAX_PACKET_SIZE)) {
size_t topicLength = strnlen(topic, MQTT_MAX_PACKET_SIZE);
if (topic == 0) {
return false;
}
if (MQTT_MAX_PACKET_SIZE < 9 + topicLength) {
// Too long
return false;
}
@ -609,6 +634,8 @@ boolean PubSubClient::connected() {
_client->flush();
_client->stop();
}
} else {
return this->_state == MQTT_CONNECTED;
}
}
return rc;

View File

@ -94,7 +94,7 @@ private:
unsigned long lastInActivity;
bool pingOutstanding;
MQTT_CALLBACK_SIGNATURE;
uint16_t readPacket(uint8_t*);
uint32_t readPacket(uint8_t*);
boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length);

View File

@ -190,7 +190,7 @@ int test_drop_invalid_remaining_length_message() {
int test_receive_oversized_stream_message() {
IT("drops an oversized message");
IT("receive an oversized streamed message");
reset_callback();
Stream stream;
@ -222,7 +222,7 @@ int test_receive_oversized_stream_message() {
IS_TRUE(callback_called);
IS_TRUE(strcmp(lastTopic,"topic")==0);
IS_TRUE(lastLength == length-9);
IS_TRUE(lastLength == MQTT_MAX_PACKET_SIZE-9);
IS_FALSE(stream.error());
IS_FALSE(shimClient.error());