From 8a2908148676cae555efc59a7052577c3dbb8435 Mon Sep 17 00:00:00 2001 From: Mark Cheverton Date: Mon, 2 Dec 2013 12:43:34 +0000 Subject: [PATCH] Added support for stream the MQTT message to storage via a Stream object --- PubSubClient/PubSubClient.cpp | 30 +++++++++++++++++++++++++++--- PubSubClient/PubSubClient.h | 3 +++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/PubSubClient/PubSubClient.cpp b/PubSubClient/PubSubClient.cpp index ca682e6..5222f2e 100755 --- a/PubSubClient/PubSubClient.cpp +++ b/PubSubClient/PubSubClient.cpp @@ -9,6 +9,7 @@ PubSubClient::PubSubClient() { this->_client = NULL; + this->stream = NULL; } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { @@ -17,6 +18,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,ui this->ip = ip; this->port = port; this->domain = NULL; + this->stream = NULL; } PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { @@ -24,6 +26,24 @@ PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,u this->callback = callback; this->domain = domain; this->port = port; + this->stream = NULL; +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) { + this->_client = &client; + this->callback = callback; + this->ip = ip; + this->port = port; + this->domain = NULL; + this->stream = stream; +} + +PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) { + this->_client = &client; + this->callback = callback; + this->domain = domain; + this->port = port; + this->stream = stream; } boolean PubSubClient::connect(char *id) { @@ -136,15 +156,19 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { *lengthLength = len-1; for (uint16_t i = 0;istream && buffer[0]&MQTTPUBLISH) + this->stream->write(digit); if (len < MQTT_MAX_PACKET_SIZE) { - buffer[len++] = readByte(); + buffer[len++] = digit; } else { - readByte(); len = 0; // This will cause the packet to be ignored. } } - return len; + // If a stream has been provided, indicate that we wrote the whole length, + // else return 0 if the length exceed the max packet size + return this->stream ? length : len; } boolean PubSubClient::loop() { diff --git a/PubSubClient/PubSubClient.h b/PubSubClient/PubSubClient.h index 3032446..4e05607 100755 --- a/PubSubClient/PubSubClient.h +++ b/PubSubClient/PubSubClient.h @@ -53,10 +53,13 @@ private: uint8_t *ip; char* domain; uint16_t port; + Stream* stream; public: PubSubClient(); PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*); PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*); boolean connect(char *); boolean connect(char *, char *, char *); boolean connect(char *, char *, uint8_t, uint8_t, char *);