added optional timeout setting so that readByte can be escaped

This commit is contained in:
Victor Aprea 2015-11-18 17:19:41 -05:00
parent efebd2e5e4
commit 6bb06187b7
2 changed files with 53 additions and 9 deletions

View File

@ -12,12 +12,14 @@ PubSubClient::PubSubClient() {
this->_client = NULL; this->_client = NULL;
this->stream = NULL; this->stream = NULL;
setCallback(NULL); setCallback(NULL);
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(Client& client) { PubSubClient::PubSubClient(Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
@ -25,12 +27,14 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
setServer(addr, port); setServer(addr, port);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setServer(addr,port); setServer(addr,port);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -38,6 +42,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -45,6 +50,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
@ -52,12 +58,14 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
setServer(ip, port); setServer(ip, port);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setServer(ip,port); setServer(ip,port);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -65,6 +73,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -72,6 +81,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE,
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
@ -79,12 +89,14 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
setServer(domain,port); setServer(domain,port);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
setServer(domain,port); setServer(domain,port);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -92,6 +104,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
this->stream = NULL; this->stream = NULL;
setReadTimeout(-1);
} }
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
this->_state = MQTT_DISCONNECTED; this->_state = MQTT_DISCONNECTED;
@ -99,6 +112,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
setCallback(callback); setCallback(callback);
setClient(client); setClient(client);
setStream(stream); setStream(stream);
setReadTimeout(-1);
} }
boolean PubSubClient::connect(const char *id) { boolean PubSubClient::connect(const char *id) {
@ -205,14 +219,37 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
return true; return true;
} }
uint8_t PubSubClient::readByte() { // reads a byte into result
while(!_client->available()) {} boolean PubSubClient::readByte(uint8_t * result) {
return _client->read(); boolean ret = false;
uint32_t previousMillis = millis();
while(!_client->available()) {
uint32_t currentMillis = millis();
if(read_timeout_ms > 0){
if(currentMillis - previousMillis >= read_timeout_ms){
return false;
}
}
}
*result = _client->read();
return true;
}
// reads a byte into result[*index] and increments index
boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
boolean ret = false;
uint16_t current_index = *index;
uint8_t * write_address = &(result[current_index]);
if(readByte(write_address)){
*index = current_index + 1;
ret = true;
}
return ret;
} }
uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0; uint16_t len = 0;
buffer[len++] = readByte(); if(!readByte(buffer, &len)) return 0;
bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1; uint32_t multiplier = 1;
uint16_t length = 0; uint16_t length = 0;
@ -221,7 +258,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint8_t start = 0; uint8_t start = 0;
do { do {
digit = readByte(); if(!readByte(&digit)) return 0;
buffer[len++] = digit; buffer[len++] = digit;
length += (digit & 127) * multiplier; length += (digit & 127) * multiplier;
multiplier *= 128; multiplier *= 128;
@ -230,8 +267,8 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
if (isPublish) { if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing // Read in topic length to calculate bytes to skip over for Stream writing
buffer[len++] = readByte(); if(!readByte(buffer, &len)) return 0;
buffer[len++] = readByte(); if(!readByte(buffer, &len)) return 0;
skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
start = 2; start = 2;
if (buffer[0]&MQTTQOS1) { if (buffer[0]&MQTTQOS1) {
@ -241,7 +278,7 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
} }
for (uint16_t i = start;i<length;i++) { for (uint16_t i = start;i<length;i++) {
digit = readByte(); if(!readByte(&digit)) return 0;
if (this->stream) { if (this->stream) {
if (isPublish && len-*lengthLength-2>skip) { if (isPublish && len-*lengthLength-2>skip) {
this->stream->write(digit); this->stream->write(digit);
@ -566,6 +603,10 @@ PubSubClient& PubSubClient::setStream(Stream& stream){
return *this; return *this;
} }
void PubSubClient::setReadTimeout(int32_t timeout_ms){
read_timeout_ms = timeout_ms;
}
int PubSubClient::state() { int PubSubClient::state() {
return this->_state; return this->_state;
} }

View File

@ -74,7 +74,8 @@ private:
bool pingOutstanding; bool pingOutstanding;
MQTT_CALLBACK_SIGNATURE; MQTT_CALLBACK_SIGNATURE;
uint16_t readPacket(uint8_t*); uint16_t readPacket(uint8_t*);
uint8_t readByte(); boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length); boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
IPAddress ip; IPAddress ip;
@ -82,6 +83,7 @@ private:
uint16_t port; uint16_t port;
Stream* stream; Stream* stream;
int _state; int _state;
int32_t read_timeout_ms;
public: public:
PubSubClient(); PubSubClient();
PubSubClient(Client& client); PubSubClient(Client& client);
@ -104,6 +106,7 @@ public:
PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE);
PubSubClient& setClient(Client& client); PubSubClient& setClient(Client& client);
PubSubClient& setStream(Stream& stream); PubSubClient& setStream(Stream& stream);
void setReadTimeout(int32_t timeout_ms);
boolean connect(const char* id); boolean connect(const char* id);
boolean connect(const char* id, const char* user, const char* pass); boolean connect(const char* id, const char* user, const char* pass);