diff --git a/PubSubClient/CHANGES.txt b/PubSubClient/CHANGES.txt index 0bedae9..a53536e 100644 --- a/PubSubClient/CHANGES.txt +++ b/PubSubClient/CHANGES.txt @@ -1,3 +1,10 @@ +1.8 + * KeepAlive interval is configurable in PubSubClient.h + * Maximum packet size is configurable in PubSubClient.h + * API change: Return boolean rather than int from various functions + * API change: Length parameter in message callback changed + from int to unsigned int + * Various internal tidy-ups around types 1.7 * Improved keepalive handling * Updated to the Arduino-1.0 API diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index bc76edc..2167585 100644 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -11,22 +11,22 @@ PubSubClient::PubSubClient() : _client() { } -PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,int)) : _client() { +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int)) : _client() { this->callback = callback; this->ip = ip; this->port = port; } -int PubSubClient::connect(char *id) { +boolean PubSubClient::connect(char *id) { return connect(id,0,0,0,0); } -int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { +boolean PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { if (!connected()) { if (_client.connect(this->ip, this->port)) { nextMsgId = 1; uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; uint8_t length = 0; - int j; + unsigned int j; for (j = 0;j<9;j++) { buffer[length++] = d[j]; } @@ -35,8 +35,8 @@ int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t wi } else { buffer[length++] = 0x02; } - buffer[length++] = 0; - buffer[length++] = (KEEPALIVE/1000); + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xff); length = writeString(id,buffer,length); if (willTopic) { length = writeString(willTopic,buffer,length); @@ -46,23 +46,23 @@ int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t wi lastOutActivity = millis(); lastInActivity = millis(); while (!_client.available()) { - long t= millis(); - if (t-lastInActivity > KEEPALIVE) { + unsigned long t= millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000) { _client.stop(); - return 0; + return false; } } - uint8_t len = readPacket(); + uint16_t len = readPacket(); if (len == 4 && buffer[3] == 0) { lastInActivity = millis(); pingOutstanding = false; - return 1; + return true; } } _client.stop(); } - return 0; + return false; } uint8_t PubSubClient::readByte() { @@ -70,11 +70,11 @@ uint8_t PubSubClient::readByte() { return _client.read(); } -uint8_t PubSubClient::readPacket() { - uint8_t len = 0; +uint16_t PubSubClient::readPacket() { + uint16_t len = 0; buffer[len++] = readByte(); uint8_t multiplier = 1; - uint8_t length = 0; + uint16_t length = 0; uint8_t digit = 0; do { digit = readByte(); @@ -83,9 +83,9 @@ uint8_t PubSubClient::readPacket() { multiplier *= 128; } while ((digit & 128) != 0); - for (int i = 0;i KEEPALIVE) || (t - lastOutActivity > KEEPALIVE)) { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000) || (t - lastOutActivity > MQTT_KEEPALIVE*1000)) { if (pingOutstanding) { _client.stop(); - return 0; + return false; } else { _client.write(MQTTPINGREQ); _client.write((uint8_t)0); @@ -112,15 +112,15 @@ int PubSubClient::loop() { } } if (_client.available()) { - uint8_t len = readPacket(); + uint16_t len = readPacket(); if (len > 0) { lastInActivity = t; uint8_t type = buffer[0]&0xF0; if (type == MQTTPUBLISH) { if (callback) { - uint8_t tl = (buffer[2]<<3)+buffer[3]; + uint16_t tl = (buffer[2]<<8)+buffer[3]; char topic[tl+1]; - for (int i=0;i 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + rc = _client.write(header); + rc += _client.write(lenBuf,llen); + rc += _client.write(buf,length); lastOutActivity = millis(); - return 0; + return (rc == 1+llen+length); } -void PubSubClient::subscribe(char* topic) { +boolean PubSubClient::subscribe(char* topic) { if (connected()) { - uint8_t length = 2; + uint16_t length = 2; nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } buffer[0] = nextMsgId >> 8; buffer[1] = nextMsgId - (buffer[0]<<8); length = writeString(topic, buffer,length); buffer[length++] = 0; // Only do QoS 0 subs - write(MQTTSUBSCRIBE,buffer,length); + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length); } + return false; } void PubSubClient::disconnect() { @@ -196,9 +215,9 @@ void PubSubClient::disconnect() { lastOutActivity = millis(); } -uint8_t PubSubClient::writeString(char* string, uint8_t* buf, uint8_t pos) { +uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) { char* idp = string; - uint8_t i = 0; + uint16_t i = 0; pos += 2; while (*idp) { buf[pos++] = *idp++; @@ -210,7 +229,7 @@ uint8_t PubSubClient::writeString(char* string, uint8_t* buf, uint8_t pos) { } -int PubSubClient::connected() { +boolean PubSubClient::connected() { int rc = (int)_client.connected(); if (!rc) _client.stop(); return rc; diff --git a/PubSubClient/PubSubClient.h b/PubSubClient/PubSubClient.h index 74628e7..9b740e2 100644 --- a/PubSubClient/PubSubClient.h +++ b/PubSubClient/PubSubClient.h @@ -10,10 +10,12 @@ #include "Ethernet.h" #include "EthernetClient.h" -#define MAX_PACKET_SIZE 128 -#define KEEPALIVE 15000 // max value = 255000 +// MQTT_MAX_PACKET_SIZE : Maximum packet size +#define MQTT_MAX_PACKET_SIZE 128 + +// MQTT_KEEPALIVE : keepAlive interval in Seconds +#define MQTT_KEEPALIVE 15 -// from mqtt-v3r1 #define MQTTPROTOCOLVERSION 3 #define MQTTCONNECT 1 << 4 // Client request to connect to Server #define MQTTCONNACK 2 << 4 // Connect Acknowledgment @@ -31,33 +33,37 @@ #define MQTTDISCONNECT 14 << 4 // Client is Disconnecting #define MQTTReserved 15 << 4 // Reserved +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + class PubSubClient { private: EthernetClient _client; - uint8_t buffer[MAX_PACKET_SIZE]; - uint8_t nextMsgId; - long lastOutActivity; - long lastInActivity; + uint8_t buffer[MQTT_MAX_PACKET_SIZE]; + uint16_t nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; bool pingOutstanding; - void (*callback)(char*,uint8_t*,int); - uint8_t readPacket(); + void (*callback)(char*,uint8_t*,unsigned int); + uint16_t readPacket(); uint8_t readByte(); - int write(uint8_t header, uint8_t* buf, uint8_t length); - uint8_t writeString(char* string, uint8_t* buf, uint8_t pos); + boolean write(uint8_t header, uint8_t* buf, uint16_t length); + uint16_t writeString(char* string, uint8_t* buf, uint16_t pos); uint8_t *ip; uint16_t port; public: PubSubClient(); - PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,int)); - int connect(char *); - int connect(char*, char*, uint8_t, uint8_t, char*); + PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int)); + boolean connect(char *); + boolean connect(char*, char*, uint8_t, uint8_t, char*); void disconnect(); - int publish(char *, char *); - int publish(char *, uint8_t *, uint8_t); - int publish(char *, uint8_t *, uint8_t, uint8_t); - void subscribe(char *); - int loop(); - int connected(); + boolean publish(char *, char *); + boolean publish(char *, uint8_t *, unsigned int); + boolean publish(char *, uint8_t *, unsigned int, boolean); + boolean subscribe(char *); + boolean loop(); + boolean connected(); };