Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
e4fdabf4db | |||
4ad0d5f4ac | |||
fb5f8de2a4 |
@ -1,3 +1,6 @@
|
|||||||
|
1.7
|
||||||
|
* Improved keepalive handling
|
||||||
|
* Updated to the Arduino-1.0 API
|
||||||
1.6
|
1.6
|
||||||
* Added the ability to publish a retained message
|
* Added the ability to publish a retained message
|
||||||
|
|
||||||
|
@ -5,14 +5,16 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "PubSubClient.h"
|
#include "PubSubClient.h"
|
||||||
#include "Client.h"
|
#include "EthernetClient.h"
|
||||||
#include "string.h"
|
#include "string.h"
|
||||||
|
|
||||||
PubSubClient::PubSubClient() : _client(0) {
|
PubSubClient::PubSubClient() : _client() {
|
||||||
}
|
}
|
||||||
|
|
||||||
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,int)) : _client(ip,port) {
|
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,int)) : _client() {
|
||||||
this->callback = callback;
|
this->callback = callback;
|
||||||
|
this->ip = ip;
|
||||||
|
this->port = port;
|
||||||
}
|
}
|
||||||
int PubSubClient::connect(char *id) {
|
int PubSubClient::connect(char *id) {
|
||||||
return connect(id,0,0,0,0);
|
return connect(id,0,0,0,0);
|
||||||
@ -20,7 +22,7 @@ int PubSubClient::connect(char *id) {
|
|||||||
|
|
||||||
int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) {
|
int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) {
|
||||||
if (!connected()) {
|
if (!connected()) {
|
||||||
if (_client.connect()) {
|
if (_client.connect(this->ip, this->port)) {
|
||||||
nextMsgId = 1;
|
nextMsgId = 1;
|
||||||
uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
|
uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION};
|
||||||
uint8_t length = 0;
|
uint8_t length = 0;
|
||||||
@ -41,11 +43,20 @@ int PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t wi
|
|||||||
length = writeString(willMessage,buffer,length);
|
length = writeString(willMessage,buffer,length);
|
||||||
}
|
}
|
||||||
write(MQTTCONNECT,buffer,length);
|
write(MQTTCONNECT,buffer,length);
|
||||||
while (!_client.available()) {}
|
lastOutActivity = millis();
|
||||||
|
lastInActivity = millis();
|
||||||
|
while (!_client.available()) {
|
||||||
|
long t= millis();
|
||||||
|
if (t-lastInActivity > KEEPALIVE) {
|
||||||
|
_client.stop();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
uint8_t len = readPacket();
|
uint8_t len = readPacket();
|
||||||
|
|
||||||
if (len == 4 && buffer[3] == 0) {
|
if (len == 4 && buffer[3] == 0) {
|
||||||
lastActivity = millis();
|
lastInActivity = millis();
|
||||||
|
pingOutstanding = false;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -88,14 +99,22 @@ uint8_t PubSubClient::readPacket() {
|
|||||||
int PubSubClient::loop() {
|
int PubSubClient::loop() {
|
||||||
if (connected()) {
|
if (connected()) {
|
||||||
long t = millis();
|
long t = millis();
|
||||||
if (t - lastActivity > KEEPALIVE) {
|
if ((t - lastInActivity > KEEPALIVE) || (t - lastOutActivity > KEEPALIVE)) {
|
||||||
_client.write(MQTTPINGREQ);
|
if (pingOutstanding) {
|
||||||
_client.write((uint8_t)0);
|
_client.stop();
|
||||||
lastActivity = t;
|
return 0;
|
||||||
|
} else {
|
||||||
|
_client.write(MQTTPINGREQ);
|
||||||
|
_client.write((uint8_t)0);
|
||||||
|
lastOutActivity = t;
|
||||||
|
lastInActivity = t;
|
||||||
|
pingOutstanding = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (_client.available()) {
|
if (_client.available()) {
|
||||||
uint8_t len = readPacket();
|
uint8_t len = readPacket();
|
||||||
if (len > 0) {
|
if (len > 0) {
|
||||||
|
lastInActivity = t;
|
||||||
uint8_t type = buffer[0]&0xF0;
|
uint8_t type = buffer[0]&0xF0;
|
||||||
if (type == MQTTPUBLISH) {
|
if (type == MQTTPUBLISH) {
|
||||||
if (callback) {
|
if (callback) {
|
||||||
@ -112,7 +131,8 @@ int PubSubClient::loop() {
|
|||||||
} else if (type == MQTTPINGREQ) {
|
} else if (type == MQTTPINGREQ) {
|
||||||
_client.write(MQTTPINGRESP);
|
_client.write(MQTTPINGRESP);
|
||||||
_client.write((uint8_t)0);
|
_client.write((uint8_t)0);
|
||||||
lastActivity = t;
|
} else if (type == MQTTPINGRESP) {
|
||||||
|
pingOutstanding = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -151,6 +171,7 @@ int PubSubClient::write(uint8_t header, uint8_t* buf, uint8_t length) {
|
|||||||
_client.write(header);
|
_client.write(header);
|
||||||
_client.write(length);
|
_client.write(length);
|
||||||
_client.write(buf,length);
|
_client.write(buf,length);
|
||||||
|
lastOutActivity = millis();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,7 +192,8 @@ void PubSubClient::disconnect() {
|
|||||||
_client.write(MQTTDISCONNECT);
|
_client.write(MQTTDISCONNECT);
|
||||||
_client.write((uint8_t)0);
|
_client.write((uint8_t)0);
|
||||||
_client.stop();
|
_client.stop();
|
||||||
lastActivity = millis();
|
lastInActivity = millis();
|
||||||
|
lastOutActivity = millis();
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t PubSubClient::writeString(char* string, uint8_t* buf, uint8_t pos) {
|
uint8_t PubSubClient::writeString(char* string, uint8_t* buf, uint8_t pos) {
|
||||||
|
@ -7,7 +7,8 @@
|
|||||||
#ifndef PubSubClient_h
|
#ifndef PubSubClient_h
|
||||||
#define PubSubClient_h
|
#define PubSubClient_h
|
||||||
|
|
||||||
#include "Client.h"
|
#include "Ethernet.h"
|
||||||
|
#include "EthernetClient.h"
|
||||||
|
|
||||||
#define MAX_PACKET_SIZE 128
|
#define MAX_PACKET_SIZE 128
|
||||||
#define KEEPALIVE 15000 // max value = 255000
|
#define KEEPALIVE 15000 // max value = 255000
|
||||||
@ -32,15 +33,19 @@
|
|||||||
|
|
||||||
class PubSubClient {
|
class PubSubClient {
|
||||||
private:
|
private:
|
||||||
Client _client;
|
EthernetClient _client;
|
||||||
uint8_t buffer[MAX_PACKET_SIZE];
|
uint8_t buffer[MAX_PACKET_SIZE];
|
||||||
uint8_t nextMsgId;
|
uint8_t nextMsgId;
|
||||||
long lastActivity;
|
long lastOutActivity;
|
||||||
|
long lastInActivity;
|
||||||
|
bool pingOutstanding;
|
||||||
void (*callback)(char*,uint8_t*,int);
|
void (*callback)(char*,uint8_t*,int);
|
||||||
uint8_t readPacket();
|
uint8_t readPacket();
|
||||||
uint8_t readByte();
|
uint8_t readByte();
|
||||||
int write(uint8_t header, uint8_t* buf, uint8_t length);
|
int write(uint8_t header, uint8_t* buf, uint8_t length);
|
||||||
uint8_t writeString(char* string, uint8_t* buf, uint8_t pos);
|
uint8_t writeString(char* string, uint8_t* buf, uint8_t pos);
|
||||||
|
uint8_t *ip;
|
||||||
|
uint16_t port;
|
||||||
public:
|
public:
|
||||||
PubSubClient();
|
PubSubClient();
|
||||||
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,int));
|
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,int));
|
||||||
|
Reference in New Issue
Block a user