Added support for stream the MQTT message to storage via a Stream object

This commit is contained in:
Mark Cheverton 2013-12-02 12:43:34 +00:00
parent fb33abc1c0
commit 8a29081486
2 changed files with 30 additions and 3 deletions

View File

@ -9,6 +9,7 @@
PubSubClient::PubSubClient() { PubSubClient::PubSubClient() {
this->_client = NULL; this->_client = NULL;
this->stream = NULL;
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
@ -17,6 +18,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,ui
this->ip = ip; this->ip = ip;
this->port = port; this->port = port;
this->domain = NULL; this->domain = NULL;
this->stream = NULL;
} }
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
@ -24,6 +26,24 @@ PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,u
this->callback = callback; this->callback = callback;
this->domain = domain; this->domain = domain;
this->port = port; this->port = port;
this->stream = NULL;
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
this->_client = &client;
this->callback = callback;
this->ip = ip;
this->port = port;
this->domain = NULL;
this->stream = stream;
}
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
this->_client = &client;
this->callback = callback;
this->domain = domain;
this->port = port;
this->stream = stream;
} }
boolean PubSubClient::connect(char *id) { boolean PubSubClient::connect(char *id) {
@ -136,15 +156,19 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
*lengthLength = len-1; *lengthLength = len-1;
for (uint16_t i = 0;i<length;i++) for (uint16_t i = 0;i<length;i++)
{ {
digit = readByte();
if(this->stream && buffer[0]&MQTTPUBLISH)
this->stream->write(digit);
if (len < MQTT_MAX_PACKET_SIZE) { if (len < MQTT_MAX_PACKET_SIZE) {
buffer[len++] = readByte(); buffer[len++] = digit;
} else { } else {
readByte();
len = 0; // This will cause the packet to be ignored. len = 0; // This will cause the packet to be ignored.
} }
} }
return len; // If a stream has been provided, indicate that we wrote the whole length,
// else return 0 if the length exceed the max packet size
return this->stream ? length : len;
} }
boolean PubSubClient::loop() { boolean PubSubClient::loop() {

View File

@ -53,10 +53,13 @@ private:
uint8_t *ip; uint8_t *ip;
char* domain; char* domain;
uint16_t port; uint16_t port;
Stream* stream;
public: public:
PubSubClient(); PubSubClient();
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
boolean connect(char *); boolean connect(char *);
boolean connect(char *, char *, char *); boolean connect(char *, char *, char *);
boolean connect(char *, char *, uint8_t, uint8_t, char *); boolean connect(char *, char *, uint8_t, uint8_t, char *);