Allow any instance of Client to be passed in.

Added new example for publishing in callback.
This commit is contained in:
Nicholas O'Leary
2012-09-13 22:35:48 +01:00
parent 2d43044338
commit a971ed4a2c
6 changed files with 109 additions and 45 deletions

View File

@ -5,19 +5,21 @@
*/
#include "PubSubClient.h"
#include <EthernetClient.h>
#include <string.h>
PubSubClient::PubSubClient() : _client() {
PubSubClient::PubSubClient(Client& client) {
this->_client = &client;
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int)) : _client() {
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
this->_client = &client;
this->callback = callback;
this->ip = ip;
this->port = port;
}
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int)) : _client() {
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
this->_client = &client;
this->callback = callback;
this->domain = domain;
this->port = port;
@ -41,9 +43,9 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic,
int result = 0;
if (domain != NULL) {
result = _client.connect(this->domain, this->port);
result = _client->connect(this->domain, this->port);
} else {
result = _client.connect(this->ip, this->port);
result = _client->connect(this->ip, this->port);
}
if (result) {
@ -90,10 +92,10 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic,
write(MQTTCONNECT,buffer,length);
lastOutActivity = millis();
lastInActivity = millis();
while (!_client.available()) {
while (!_client->available()) {
unsigned long t= millis();
if (t-lastInActivity > MQTT_KEEPALIVE*1000) {
_client.stop();
_client->stop();
return false;
}
}
@ -105,14 +107,14 @@ boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic,
return true;
}
}
_client.stop();
_client->stop();
}
return false;
}
uint8_t PubSubClient::readByte() {
while(!_client.available()) {}
return _client.read();
while(!_client->available()) {}
return _client->read();
}
uint16_t PubSubClient::readPacket() {
@ -146,17 +148,17 @@ boolean PubSubClient::loop() {
unsigned long t = millis();
if ((t - lastInActivity > MQTT_KEEPALIVE*1000) || (t - lastOutActivity > MQTT_KEEPALIVE*1000)) {
if (pingOutstanding) {
_client.stop();
_client->stop();
return false;
} else {
_client.write(MQTTPINGREQ);
_client.write((uint8_t)0);
_client->write(MQTTPINGREQ);
_client->write((uint8_t)0);
lastOutActivity = t;
lastInActivity = t;
pingOutstanding = true;
}
}
if (_client.available()) {
if (_client->available()) {
uint16_t len = readPacket();
if (len > 0) {
lastInActivity = t;
@ -174,8 +176,8 @@ boolean PubSubClient::loop() {
callback(topic,payload,len-4-tl);
}
} else if (type == MQTTPINGREQ) {
_client.write(MQTTPINGRESP);
_client.write((uint8_t)0);
_client->write(MQTTPINGRESP);
_client->write((uint8_t)0);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
}
@ -221,14 +223,16 @@ boolean PubSubClient::publish_P(char* topic, uint8_t* PROGMEM payload, unsigned
uint8_t header;
unsigned int len;
if (!connected())
if (!connected()) {
return false;
}
tlen = strlen(topic);
header = MQTTPUBLISH;
if (retained)
if (retained) {
header |= 1;
}
len = plength + 2 + tlen;
do {
@ -241,17 +245,18 @@ boolean PubSubClient::publish_P(char* topic, uint8_t* PROGMEM payload, unsigned
llen++;
} while(len>0);
rc += _client.write(&header, 1);
rc += _client.write(lenBuf, llen);
rc += _client->write(&header, 1);
rc += _client->write(lenBuf, llen);
lenBuf[0] = tlen >> 8; // MSB
lenBuf[1] = tlen & 0xFF; // LSB
rc += _client.write(lenBuf, 2);
rc += _client.write((uint8_t *)topic, tlen);
rc += _client->write(lenBuf, 2);
rc += _client->write((uint8_t *)topic, tlen);
for (i=0;i<plength;i++)
rc += _client.write((char)pgm_read_byte_near(payload + i));
for (i=0;i<plength;i++) {
rc += _client->write((char)pgm_read_byte_near(payload + i));
}
lastOutActivity = millis();
return rc == len + 1 + plength;
@ -274,9 +279,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
llen++;
} while(len>0);
rc = _client.write(header);
rc += _client.write(lenBuf,llen);
rc += _client.write(buf,length);
rc = _client->write(header);
rc += _client->write(lenBuf,llen);
rc += _client->write(buf,length);
lastOutActivity = millis();
return (rc == 1+llen+length);
}
@ -299,9 +304,9 @@ boolean PubSubClient::subscribe(char* topic) {
}
void PubSubClient::disconnect() {
_client.write(MQTTDISCONNECT);
_client.write((uint8_t)0);
_client.stop();
_client->write(MQTTDISCONNECT);
_client->write((uint8_t)0);
_client->stop();
lastInActivity = millis();
lastOutActivity = millis();
}
@ -321,10 +326,8 @@ uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) {
boolean PubSubClient::connected() {
int rc = (int)_client.connected();
if (!rc) _client.stop();
int rc = (int)_client->connected();
if (!rc) _client->stop();
return rc;
}