Added support for stream the MQTT message to storage via a Stream object

This commit is contained in:
Mark Cheverton
2013-12-02 12:43:34 +00:00
parent fb33abc1c0
commit 1344cdf1b4
3 changed files with 134 additions and 8 deletions

View File

@ -9,6 +9,7 @@
PubSubClient::PubSubClient() { PubSubClient::PubSubClient() {
this->_client = NULL; this->_client = NULL;
this->stream = NULL;
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
@ -17,6 +18,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,ui
this->ip = ip; this->ip = ip;
this->port = port; this->port = port;
this->domain = NULL; this->domain = NULL;
this->stream = NULL;
} }
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) {
@ -24,6 +26,24 @@ PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,u
this->callback = callback; this->callback = callback;
this->domain = domain; this->domain = domain;
this->port = port; this->port = port;
this->stream = NULL;
}
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
this->_client = &client;
this->callback = callback;
this->ip = ip;
this->port = port;
this->domain = NULL;
this->stream = stream;
}
PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client, Stream *stream) {
this->_client = &client;
this->callback = callback;
this->domain = domain;
this->port = port;
this->stream = stream;
} }
boolean PubSubClient::connect(char *id) { boolean PubSubClient::connect(char *id) {
@ -134,16 +154,32 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
multiplier *= 128; multiplier *= 128;
} while ((digit & 128) != 0); } while ((digit & 128) != 0);
*lengthLength = len-1; *lengthLength = len-1;
for (uint16_t i = 0;i<length;i++)
// Read in topic length to calculate bytes to skip over for Stream writing
buffer[len++] = readByte();
buffer[len++] = readByte();
uint16_t skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
if (buffer[0]&MQTTQOS1) {
// skip message id
skip += 2;
}
for (uint16_t i = 2;i<length;i++)
{ {
digit = readByte();
if(this->stream && ((buffer[0]&0xF0) == MQTTPUBLISH) && len-*lengthLength-2>skip) {
this->stream->write(digit);
}
if (len < MQTT_MAX_PACKET_SIZE) { if (len < MQTT_MAX_PACKET_SIZE) {
buffer[len++] = readByte(); buffer[len++] = digit;
} else { } else {
readByte(); if(!this->stream) len = 0; // This will cause the packet to be ignored.
len = 0; // This will cause the packet to be ignored.
} }
} }
// If a stream has been provided, indicate that we wrote the whole length,
// else return 0 if the length exceed the max packet size
return len; return len;
} }
@ -166,6 +202,8 @@ boolean PubSubClient::loop() {
if (_client->available()) { if (_client->available()) {
uint8_t llen; uint8_t llen;
uint16_t len = readPacket(&llen); uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) { if (len > 0) {
lastInActivity = t; lastInActivity = t;
uint8_t type = buffer[0]&0xF0; uint8_t type = buffer[0]&0xF0;
@ -177,9 +215,16 @@ boolean PubSubClient::loop() {
topic[i] = buffer[llen+3+i]; topic[i] = buffer[llen+3+i];
} }
topic[tl] = 0; topic[tl] = 0;
// ignore msgID - only support QoS 0 subs // msgId only present for QOS>0
uint8_t *payload = buffer+llen+3+tl; if (buffer[0]&MQTTQOS1) {
callback(topic,payload,len-llen-3-tl); msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
puback(msgId);
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
} }
} else if (type == MQTTPINGREQ) { } else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP; buffer[0] = MQTTPINGRESP;
@ -293,6 +338,13 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
} }
boolean PubSubClient::subscribe(char* topic) { boolean PubSubClient::subscribe(char* topic) {
return subscribe(topic, 0);
}
boolean PubSubClient::subscribe(char* topic, uint8_t qos) {
if (qos < 0 || qos > 1)
return false;
if (connected()) { if (connected()) {
// Leave room in the buffer for header and variable length field // Leave room in the buffer for header and variable length field
uint16_t length = 5; uint16_t length = 5;
@ -303,12 +355,23 @@ boolean PubSubClient::subscribe(char* topic) {
buffer[length++] = (nextMsgId >> 8); buffer[length++] = (nextMsgId >> 8);
buffer[length++] = (nextMsgId & 0xFF); buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length); length = writeString(topic, buffer,length);
buffer[length++] = 0; // Only do QoS 0 subs buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
} }
return false; return false;
} }
boolean PubSubClient::puback(uint16_t msgId) {
if(connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
buffer[length++] = (msgId >> 8);
buffer[length++] = (msgId & 0xFF);
return write(MQTTPUBACK,buffer,length-5);
}
return false;
}
boolean PubSubClient::unsubscribe(char* topic) { boolean PubSubClient::unsubscribe(char* topic) {
if (connected()) { if (connected()) {
uint16_t length = 5; uint16_t length = 5;

View File

@ -53,10 +53,13 @@ private:
uint8_t *ip; uint8_t *ip;
char* domain; char* domain;
uint16_t port; uint16_t port;
Stream* stream;
public: public:
PubSubClient(); PubSubClient();
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client);
PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client, Stream*);
boolean connect(char *); boolean connect(char *);
boolean connect(char *, char *, char *); boolean connect(char *, char *, char *);
boolean connect(char *, char *, uint8_t, uint8_t, char *); boolean connect(char *, char *, uint8_t, uint8_t, char *);
@ -67,7 +70,9 @@ public:
boolean publish(char *, uint8_t *, unsigned int, boolean); boolean publish(char *, uint8_t *, unsigned int, boolean);
boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean); boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean);
boolean subscribe(char *); boolean subscribe(char *);
boolean subscribe(char *, uint8_t qos);
boolean unsubscribe(char *); boolean unsubscribe(char *);
boolean puback(uint16_t msgId);
boolean loop(); boolean loop();
boolean connected(); boolean connected();
}; };

View File

@ -0,0 +1,58 @@
/*
Example of using a Stream object to store the message payload
Uses SRAM library: https://github.com/ennui2342/arduino-sram
but could use any Stream based class such as SD
- connects to an MQTT server
- publishes "hello world" to the topic "outTopic"
- subscribes to the topic "inTopic"
*/
#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>
#include <SRAM.h>
// Update these with values suitable for your network.
byte mac[] = { 0xDE, 0xED, 0xBA, 0xFE, 0xFE, 0xED };
byte server[] = { 172, 16, 0, 2 };
byte ip[] = { 172, 16, 0, 100 };
SRAM sram(4, SRAM_1024);
void callback(char* topic, byte* payload, unsigned int length) {
sram.seek(1);
// do something with the message
for(uint8_t i=0; i<length; i++) {
Serial.write(sram.read());
}
Serial.println();
// Reset position for the next message to be stored
sram.seek(1);
}
EthernetClient ethClient;
PubSubClient client(server, 1883, callback, ethClient, &sram);
void setup()
{
Ethernet.begin(mac, ip);
if (client.connect("arduinoClient")) {
client.publish("outTopic","hello world");
client.subscribe("inTopic");
}
sram.begin();
sram.seek(1);
Serial.begin(9600);
}
void loop()
{
client.loop();
}