Merge pull request #32 from ennui2342/stream

Stream support for large packets
This commit is contained in:
Nick O'Leary 2014-01-26 06:51:06 -08:00
commit bcead02903
3 changed files with 101 additions and 4 deletions

View File

@ -9,6 +9,7 @@
PubSubClient::PubSubClient() {
this->_client = NULL;
this->stream = NULL;
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
@ -17,6 +18,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,ui
this->ip = ip;
this->port = port;
this->domain = NULL;
this->stream = NULL;
}
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
@ -24,6 +26,24 @@ PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,u
this->callback = callback;
this->domain = domain;
this->port = port;
this->stream = NULL;
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
this->_client = &client;
this->callback = callback;
this->ip = ip;
this->port = port;
this->domain = NULL;
this->stream = stream;
}
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
this->_client = &client;
this->callback = callback;
this->domain = domain;
this->port = port;
this->stream = stream;
}
boolean PubSubClient::connect(char *id) {
@ -134,16 +154,32 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
multiplier *= 128;
} while ((digit & 128) != 0);
*lengthLength = len-1;
for (uint16_t i = 0;i<length;i++)
// Read in topic length to calculate bytes to skip over for Stream writing
buffer[len++] = readByte();
buffer[len++] = readByte();
uint16_t skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
if (buffer[0]&MQTTQOS1) {
// skip message id
skip += 2;
}
for (uint16_t i = 2;i<length;i++)
{
digit = readByte();
if(this->stream && ((buffer[0]&0xF0) == MQTTPUBLISH) && len-*lengthLength-2>skip) {
this->stream->write(digit);
}
if (len < MQTT_MAX_PACKET_SIZE) {
buffer[len++] = readByte();
buffer[len++] = digit;
} else {
readByte();
len = 0; // This will cause the packet to be ignored.
if(!this->stream) len = 0; // This will cause the packet to be ignored.
}
}
// If a stream has been provided, indicate that we wrote the whole length,
// else return 0 if the length exceed the max packet size
return len;
}

View File

@ -53,10 +53,13 @@ private:
uint8_t *ip;
char* domain;
uint16_t port;
Stream* stream;
public:
PubSubClient();
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
boolean connect(char *);
boolean connect(char *, char *, char *);
boolean connect(char *, char *, uint8_t, uint8_t, char *);

View File

@ -0,0 +1,58 @@
/*
Example of using a Stream object to store the message payload
Uses SRAM library: https://github.com/ennui2342/arduino-sram
but could use any Stream based class such as SD
- connects to an MQTT server
- publishes "hello world" to the topic "outTopic"
- subscribes to the topic "inTopic"
*/
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
#include <SRAM.h>
// Update these with values suitable for your network.
byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
byte server[] = { 172, 16, 0, 2 };
byte ip[] = { 172, 16, 0, 100 };
SRAM sram(4, SRAM_1024);
void callback(char* topic, byte* payload, unsigned int length) {
sram.seek(1);
// do something with the message
for(uint8_t i=0; i<length; i++) {
Serial.write(sram.read());
}
Serial.println();
// Reset position for the next message to be stored
sram.seek(1);
}
EthernetClient ethClient;
PubSubClient client(server, 1883, callback, ethClient, &sram);
void setup()
{
Ethernet.begin(mac, ip);
if (client.connect("arduinoClient")) {
client.publish("outTopic","hello world");
client.subscribe("inTopic");
}
sram.begin();
sram.seek(1);
Serial.begin(9600);
}
void loop()
{
client.loop();
}