diff --git a/.cproject b/.cproject index 1ad3bf8..9c8745a 100644 --- a/.cproject +++ b/.cproject @@ -24,6 +24,9 @@ + + + @@ -34,6 +37,9 @@ + + + @@ -46,10 +52,13 @@ - + + + + diff --git a/.settings/org.eclipse.cdt.core.prefs b/.settings/org.eclipse.cdt.core.prefs index f099a2f..0b8ffc0 100644 --- a/.settings/org.eclipse.cdt.core.prefs +++ b/.settings/org.eclipse.cdt.core.prefs @@ -166,10 +166,10 @@ environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA. environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.DTS/value=0 environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.LOCAL/delimiter=\: environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.LOCAL/operation=replace -environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.LOCAL/value=1477853595 +environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.LOCAL/value=1477917830 environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.UTC/delimiter=\: environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.UTC/operation=replace -environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.UTC/value=1477849995 +environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.UTC/value=1477914230 environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.ZONE/delimiter=\: environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.ZONE/operation=replace environment/project/it.baeyens.arduino.core.toolChain.release.529393341/A.EXTRA.TIME.ZONE/value=3600 diff --git a/MQTT/PubSubClient.cpp b/MQTT/PubSubClient.cpp new file mode 100644 index 0000000..9658c4a --- /dev/null +++ b/MQTT/PubSubClient.cpp @@ -0,0 +1,590 @@ +/* + PubSubClient.cpp - A simple client for MQTT. + Nick O'Leary + http://knolleary.net +*/ + +#include "PubSubClient.h" +#include "Arduino.h" + +PubSubClient::PubSubClient() { + this->_state = MQTT_DISCONNECTED; + this->_client = NULL; + this->stream = NULL; + setCallback(NULL); +} + +PubSubClient::PubSubClient(Client& client) { + this->_state = MQTT_DISCONNECTED; + setClient(client); + this->stream = NULL; +} + +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(addr, port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(addr,port); + setCallback(callback); + setClient(client); + setStream(stream); +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(ip, port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(ip,port); + setCallback(callback); + setClient(client); + setStream(stream); +} + +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setClient(client); + setStream(stream); +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + this->stream = NULL; +} +PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { + this->_state = MQTT_DISCONNECTED; + setServer(domain,port); + setCallback(callback); + setClient(client); + setStream(stream); +} + +boolean PubSubClient::connect(const char *id) { + return connect(id,NULL,NULL,0,0,0,0); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass) { + return connect(id,user,pass,0,0,0,0); +} + +boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) { + if (!connected()) { + int result = 0; + + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + if (result == 1) { + nextMsgId = 1; + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + unsigned int j; + +#if MQTT_VERSION == MQTT_VERSION_3_1 + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 9 +#elif MQTT_VERSION == MQTT_VERSION_3_1_1 + uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION}; +#define MQTT_HEADER_VERSION_LENGTH 7 +#endif + for (j = 0;j>1); + } + } + + buffer[length++] = v; + + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } + + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } + } + + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + while (!_client->available()) { + unsigned long t = millis(); + if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) { + _state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } + } + uint8_t llen; + uint16_t len = readPacket(&llen); + + if (len == 4) { + if (buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + _state = MQTT_CONNECTED; + return true; + } else { + _state = buffer[3]; + } + } + _client->stop(); + } else { + _state = MQTT_CONNECT_FAILED; + } + return false; + } + return true; +} + +// reads a byte into result +boolean PubSubClient::readByte(uint8_t * result) { + uint32_t previousMillis = millis(); + while(!_client->available()) { + uint32_t currentMillis = millis(); + if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){ + return false; + } + } + *result = _client->read(); + return true; +} + +// reads a byte into result[*index] and increments index +boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ + uint16_t current_index = *index; + uint8_t * write_address = &(result[current_index]); + if(readByte(write_address)){ + *index = current_index + 1; + return true; + } + return false; +} + +uint16_t PubSubClient::readPacket(uint8_t* lengthLength) { + uint16_t len = 0; + if(!readByte(buffer, &len)) return 0; + bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH; + uint32_t multiplier = 1; + uint16_t length = 0; + uint8_t digit = 0; + uint16_t skip = 0; + uint8_t start = 0; + + do { + if(!readByte(&digit)) return 0; + buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier *= 128; + } while ((digit & 128) != 0); + *lengthLength = len-1; + + if (isPublish) { + // Read in topic length to calculate bytes to skip over for Stream writing + if(!readByte(buffer, &len)) return 0; + if(!readByte(buffer, &len)) return 0; + skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2]; + start = 2; + if (buffer[0]&MQTTQOS1) { + // skip message id + skip += 2; + } + } + + for (uint16_t i = start;istream) { + if (isPublish && len-*lengthLength-2>skip) { + this->stream->write(digit); + } + } + if (len < MQTT_MAX_PACKET_SIZE) { + buffer[len] = digit; + } + len++; + } + + if (!this->stream && len > MQTT_MAX_PACKET_SIZE) { + len = 0; // This will cause the packet to be ignored. + } + + return len; +} + +boolean PubSubClient::loop() { + if (connected()) { + unsigned long t = millis(); + if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + this->_state = MQTT_CONNECTION_TIMEOUT; + _client->stop(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint8_t llen; + uint16_t len = readPacket(&llen); + uint16_t msgId = 0; + uint8_t *payload; + if (len > 0) { + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; + char topic[tl+1]; + for (uint16_t i=0;i0 + if ((buffer[0]&0x06) == MQTTQOS1) { + msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; + payload = buffer+llen+3+tl+2; + callback(topic,payload,len-llen-3-tl-2); + + buffer[0] = MQTTPUBACK; + buffer[1] = 2; + buffer[2] = (msgId >> 8); + buffer[3] = (msgId & 0xFF); + _client->write(buffer,4); + lastOutActivity = t; + + } else { + payload = buffer+llen+3+tl; + callback(topic,payload,len-llen-3-tl); + } + } + } else if (type == MQTTPINGREQ) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client->write(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } + } + return true; + } + return false; +} + +boolean PubSubClient::publish(const char* topic, const char* payload) { + return publish(topic,(const uint8_t*)payload,strlen(payload),false); +} + +boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { + return publish(topic,(const uint8_t*)payload,strlen(payload),retained); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { + return publish(topic, payload, plength, false); +} + +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { + if (connected()) { + if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) { + // Too long + return false; + } + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + length = writeString(topic,buffer,length); + uint16_t i; + for (i=0;i 0) { + digit |= 0x80; + } + buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,buffer,pos); + + rc += _client->write(buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + + return rc == tlen + 4 + plength; +} + +boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint16_t rc; + uint16_t len = length; + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;i 0) && result) { + bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining; + rc = _client->write(writeBuf,bytesToWrite); + result = (rc == bytesToWrite); + bytesRemaining -= rc; + writeBuf += rc; + } + return result; +#else + rc = _client->write(buf+(4-llen),length+1+llen); + lastOutActivity = millis(); + return (rc == 1+llen+length); +#endif +} + +boolean PubSubClient::subscribe(const char* topic) { + return subscribe(topic, 0); +} + +boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { + if (qos < 0 || qos > 1) { + return false; + } + if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { + // Too long + return false; + } + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, buffer,length); + buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +boolean PubSubClient::unsubscribe(const char* topic) { + if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) { + // Too long + return false; + } + if (connected()) { + uint16_t length = 5; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[length++] = (nextMsgId >> 8); + buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +void PubSubClient::disconnect() { + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client->write(buffer,2); + _state = MQTT_DISCONNECTED; + _client->stop(); + lastInActivity = lastOutActivity = millis(); +} + +uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { + const char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +boolean PubSubClient::connected() { + boolean rc; + if (_client == NULL ) { + rc = false; + } else { + rc = (int)_client->connected(); + if (!rc) { + if (this->_state == MQTT_CONNECTED) { + this->_state = MQTT_CONNECTION_LOST; + _client->flush(); + _client->stop(); + } + } + } + return rc; +} + +PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) { + IPAddress addr(ip[0],ip[1],ip[2],ip[3]); + return setServer(addr,port); +} + +PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) { + this->ip = ip; + this->port = port; + this->domain = NULL; + return *this; +} + +PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) { + this->domain = domain; + this->port = port; + return *this; +} + +PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) { + this->callback = callback; + return *this; +} + +PubSubClient& PubSubClient::setClient(Client& client){ + this->_client = &client; + return *this; +} + +PubSubClient& PubSubClient::setStream(Stream& stream){ + this->stream = &stream; + return *this; +} + +int PubSubClient::state() { + return this->_state; +} diff --git a/MQTT/PubSubClient.h b/MQTT/PubSubClient.h new file mode 100644 index 0000000..be4bd67 --- /dev/null +++ b/MQTT/PubSubClient.h @@ -0,0 +1,144 @@ +/* + PubSubClient.h - A simple client for MQTT. + Nick O'Leary + http://knolleary.net +*/ + +#ifndef PubSubClient_h +#define PubSubClient_h + +#include +#include "IPAddress.h" +#include "Client.h" +#include "Stream.h" + +#define MQTT_VERSION_3_1 3 +#define MQTT_VERSION_3_1_1 4 + +// MQTT_VERSION : Pick the version +//#define MQTT_VERSION MQTT_VERSION_3_1 +#ifndef MQTT_VERSION +#define MQTT_VERSION MQTT_VERSION_3_1_1 +#endif + +// MQTT_MAX_PACKET_SIZE : Maximum packet size +#ifndef MQTT_MAX_PACKET_SIZE +#define MQTT_MAX_PACKET_SIZE 128 +#endif + +// MQTT_KEEPALIVE : keepAlive interval in Seconds +#ifndef MQTT_KEEPALIVE +#define MQTT_KEEPALIVE 15 +#endif + +// MQTT_SOCKET_TIMEOUT: socket timeout interval in Seconds +#ifndef MQTT_SOCKET_TIMEOUT +#define MQTT_SOCKET_TIMEOUT 15 +#endif + +// MQTT_MAX_TRANSFER_SIZE : limit how much data is passed to the network client +// in each write call. Needed for the Arduino Wifi Shield. Leave undefined to +// pass the entire MQTT packet in each write call. +//#define MQTT_MAX_TRANSFER_SIZE 80 + +// Possible values for client.state() +#define MQTT_CONNECTION_TIMEOUT -4 +#define MQTT_CONNECTION_LOST -3 +#define MQTT_CONNECT_FAILED -2 +#define MQTT_DISCONNECTED -1 +#define MQTT_CONNECTED 0 +#define MQTT_CONNECT_BAD_PROTOCOL 1 +#define MQTT_CONNECT_BAD_CLIENT_ID 2 +#define MQTT_CONNECT_UNAVAILABLE 3 +#define MQTT_CONNECT_BAD_CREDENTIALS 4 +#define MQTT_CONNECT_UNAUTHORIZED 5 + +#define MQTTCONNECT 1 << 4 // Client request to connect to Server +#define MQTTCONNACK 2 << 4 // Connect Acknowledgment +#define MQTTPUBLISH 3 << 4 // Publish message +#define MQTTPUBACK 4 << 4 // Publish Acknowledgment +#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) +#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) +#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) +#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request +#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment +#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request +#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment +#define MQTTPINGREQ 12 << 4 // PING Request +#define MQTTPINGRESP 13 << 4 // PING Response +#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting +#define MQTTReserved 15 << 4 // Reserved + +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + +#ifdef ESP8266 +#include +#define MQTT_CALLBACK_SIGNATURE std::function callback +#else +#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) +#endif + +class PubSubClient { +private: + Client* _client; + uint8_t buffer[MQTT_MAX_PACKET_SIZE]; + uint16_t nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; + bool pingOutstanding; + MQTT_CALLBACK_SIGNATURE; + uint16_t readPacket(uint8_t*); + boolean readByte(uint8_t * result); + boolean readByte(uint8_t * result, uint16_t * index); + boolean write(uint8_t header, uint8_t* buf, uint16_t length); + uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); + IPAddress ip; + const char* domain; + uint16_t port; + Stream* stream; + int _state; +public: + PubSubClient(); + PubSubClient(Client& client); + PubSubClient(IPAddress, uint16_t, Client& client); + PubSubClient(IPAddress, uint16_t, Client& client, Stream&); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(IPAddress, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, Client& client); + PubSubClient(uint8_t *, uint16_t, Client& client, Stream&); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(uint8_t *, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + PubSubClient(const char*, uint16_t, Client& client); + PubSubClient(const char*, uint16_t, Client& client, Stream&); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); + PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); + + PubSubClient& setServer(IPAddress ip, uint16_t port); + PubSubClient& setServer(uint8_t * ip, uint16_t port); + PubSubClient& setServer(const char * domain, uint16_t port); + PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); + PubSubClient& setClient(Client& client); + PubSubClient& setStream(Stream& stream); + + boolean connect(const char* id); + boolean connect(const char* id, const char* user, const char* pass); + boolean connect(const char* id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + boolean connect(const char* id, const char* user, const char* pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage); + void disconnect(); + boolean publish(const char* topic, const char* payload); + boolean publish(const char* topic, const char* payload, boolean retained); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); + boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean subscribe(const char* topic); + boolean subscribe(const char* topic, uint8_t qos); + boolean unsubscribe(const char* topic); + boolean loop(); + boolean connected(); + int state(); +}; + + +#endif diff --git a/Metro/Metro.cpp b/Metro/Metro.cpp new file mode 100644 index 0000000..3d13747 --- /dev/null +++ b/Metro/Metro.cpp @@ -0,0 +1,73 @@ + +#if defined(ARDUINO) && ARDUINO >= 100 +#include "Arduino.h" +#else +#include "WProgram.h" +#endif +#include "Metro.h" + + +Metro::Metro(unsigned long interval_millis) +{ + this->autoreset = 0; + interval(interval_millis); + reset(); +} + +// New creator so I can use either the original check behavior or benjamin.soelberg's +// suggested one (see below). +// autoreset = 0 is benjamin.soelberg's check behavior +// autoreset != 0 is the original behavior + +Metro::Metro(unsigned long interval_millis, uint8_t autoreset) +{ + this->autoreset = autoreset; // Fix by Paul Bouchier + interval(interval_millis); + reset(); +} + +void Metro::interval(unsigned long interval_millis) +{ + this->interval_millis = interval_millis; +} + +// Benjamin.soelberg's check behavior: +// When a check is true, add the interval to the internal counter. +// This should guarantee a better overall stability. + +// Original check behavior: +// When a check is true, add the interval to the current millis() counter. +// This method can add a certain offset over time. + +char Metro::check() +{ + if (millis() - this->previous_millis >= this->interval_millis) { + // As suggested by benjamin.soelberg@gmail.com, the following line + // this->previous_millis = millis(); + // was changed to + // this->previous_millis += this->interval_millis; + + // If the interval is set to 0 we revert to the original behavior + if (this->interval_millis <= 0 || this->autoreset ) { + this->previous_millis = millis(); + } else { + this->previous_millis += this->interval_millis; + } + + return 1; + } + + + + return 0; + +} + +void Metro::reset() +{ + + this->previous_millis = millis(); + +} + + diff --git a/Metro/Metro.h b/Metro/Metro.h new file mode 100644 index 0000000..cb21776 --- /dev/null +++ b/Metro/Metro.h @@ -0,0 +1,26 @@ + + +#ifndef Metro_h +#define Metro_h + +#include + +class Metro +{ + +public: + Metro(unsigned long interval_millis); + Metro(unsigned long interval_millis, uint8_t autoreset); + void interval(unsigned long interval_millis); + char check(); + void reset(); + +private: + uint8_t autoreset; + unsigned long previous_millis, interval_millis; + +}; + +#endif + + diff --git a/MqttCanGateway.cpp b/MqttCanGateway.cpp index 72fbd05..ab42420 100644 --- a/MqttCanGateway.cpp +++ b/MqttCanGateway.cpp @@ -1,34 +1,22 @@ -// Do not remove the include below #include "MqttCanGateway.h" -// demo: CAN-BUS Shield, send data -#include -#include +#include +#include -// the cs pin of the version after v1.1 is default to D9 -// v0.9b and v1.0 is default D10 -const int SPI_CS_PIN = 10; +#include "mqttclient.h" -MCP_CAN CAN(SPI_CS_PIN); // Set CS pin -void setup() -{ +void setup() { Serial.begin(115200); + Serial << "MqttCanGateway starting ..." << endl; - while (CAN_OK != CAN.begin(CAN_500KBPS)) // init can bus : baudrate = 500k - { - Serial.println("CAN BUS Shield init fail"); - Serial.println(" Init CAN BUS Shield again"); - delay(100); - } - Serial.println("CAN BUS Shield init ok!"); + MqttClientNS::begin(); + + + wdt_enable(WDTO_8S); } -unsigned char stmp[8] = {0, 1, 2, 3, 4, 5, 6, 7}; -void loop() -{ - // send data: id = 0x00, standrad frame, data len = 8, stmp: data buf - CAN.sendMsgBuf(0x00, 0, 8, stmp); - delay(100); // send data per 100ms +void loop() { + MqttClientNS::exec(); } diff --git a/Streaming/Streaming.h b/Streaming/Streaming.h new file mode 100644 index 0000000..9e54ae9 --- /dev/null +++ b/Streaming/Streaming.h @@ -0,0 +1,105 @@ +/* +Streaming.h - Arduino library for supporting the << streaming operator +Copyright (c) 2010-2012 Mikal Hart. All rights reserved. + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef ARDUINO_STREAMING +#define ARDUINO_STREAMING + +#if defined(ARDUINO) && ARDUINO >= 100 +#include "Arduino.h" +#else +#include "WProgram.h" +#endif + +#define STREAMING_LIBRARY_VERSION 5 + +// Generic template +template +inline Print &operator <<(Print &stream, T arg) +{ stream.print(arg); return stream; } + +struct _BASED +{ + long val; + int base; + _BASED(long v, int b): val(v), base(b) + {} +}; + +#if ARDUINO >= 100 + +struct _BYTE_CODE +{ + byte val; + _BYTE_CODE(byte v) : val(v) + {} +}; +#define _BYTE(a) _BYTE_CODE(a) + +inline Print &operator <<(Print &obj, const _BYTE_CODE &arg) +{ obj.write(arg.val); return obj; } + +#else + +#define _BYTE(a) _BASED(a, BYTE) + +#endif + +#define _HEX(a) _BASED(a, HEX) +#define _DEC(a) _BASED(a, DEC) +#define _OCT(a) _BASED(a, OCT) +#define _BIN(a) _BASED(a, BIN) + +// Specialization for class _BASED +// Thanks to Arduino forum user Ben Combee who suggested this +// clever technique to allow for expressions like +// Serial << _HEX(a); + +inline Print &operator <<(Print &obj, const _BASED &arg) +{ obj.print(arg.val, arg.base); return obj; } + +#if ARDUINO >= 18 +// Specialization for class _FLOAT +// Thanks to Michael Margolis for suggesting a way +// to accommodate Arduino 0018's floating point precision +// feature like this: +// Serial << _FLOAT(gps_latitude, 6); // 6 digits of precision + +struct _FLOAT +{ + float val; + int digits; + _FLOAT(double v, int d): val(v), digits(d) + {} +}; + +inline Print &operator <<(Print &obj, const _FLOAT &arg) +{ obj.print(arg.val, arg.digits); return obj; } +#endif + +// Specialization for enum _EndLineCode +// Thanks to Arduino forum user Paul V. who suggested this +// clever technique to allow for expressions like +// Serial << "Hello!" << endl; + +enum _EndLineCode { endl }; + +inline Print &operator <<(Print &obj, _EndLineCode arg) +{ obj.println(); return obj; } + +#endif diff --git a/canclient.cpp b/canclient.cpp new file mode 100644 index 0000000..30cf649 --- /dev/null +++ b/canclient.cpp @@ -0,0 +1,49 @@ +/* + * canclient.cpp + * + * Created on: Oct 31, 2016 + * Author: wn + */ + +#include "canclient.h" + +#include +#include +#include + + +const int SPI_CS_PIN = 10; +MCP_CAN CAN(SPI_CS_PIN); // Set CS pin + + +void CanClientNS::begin() { + while (CAN_OK != CAN.begin(CAN_500KBPS)) // init can bus : baudrate = 500k + { + Serial.println("CAN BUS Shield init fail"); + Serial.println(" Init CAN BUS Shield again"); + delay(100); + } + Serial.println("CAN BUS Shield init ok!"); +} + +void CanClientNS::exec() { + +} + +void CanClientNS::sendMessage(uint32_t addr, uint8_t payloadLength, uint8_t *payload) { + // CAN.sendMsgBuf(0x00, 0, 8, stmp); + + Serial << "Once again:" << endl; + Serial << "Address: " << addr << endl; + Serial << "Length: " << (uint16_t)payloadLength << endl; + Serial << "Data: "; + for (uint8_t i = 0; i < payloadLength; i++) { + Serial << (uint16_t)payload[i] << " "; + } + Serial << endl; + +} + + + + diff --git a/canclient.h b/canclient.h new file mode 100644 index 0000000..cd55f52 --- /dev/null +++ b/canclient.h @@ -0,0 +1,21 @@ +/* + * canclient.h + * + * Created on: Oct 31, 2016 + * Author: wn + */ + +#ifndef CANCLIENT_H_ +#define CANCLIENT_H_ + +#include + +namespace CanClientNS { + void begin(); + void exec(); + void sendMessage(uint32_t addr, uint8_t payloadLength, uint8_t *payload); +}; + + + +#endif /* CANCLIENT_H_ */ diff --git a/mqttclient.cpp b/mqttclient.cpp new file mode 100644 index 0000000..cdc368a --- /dev/null +++ b/mqttclient.cpp @@ -0,0 +1,209 @@ +/* + * mqttclient.cpp + * + * Created on: 03.03.2016 + * Author: wn + */ + +#include + +#include "mqttclient.h" + +#include +#include +#include +#include +#include +#include + +#include "canclient.h" + + + +static const char MESSAGE_TOPIC[] = "IoT/MqttCanGateway/Message"; +static const char WATCHDOG_TOPIC[] = "IoT/Watchdog"; + + + +void callback(char* topic, byte* payload, unsigned int length); + + +static uint8_t MAC[] = { 0x90, 0xA2, 0xDA, 0x00, 0x51, 0x09 }; +const static char BROKER[] = "mqttbroker"; +EthernetClient client; +PubSubClient mqttClient = PubSubClient(BROKER, 1883, callback, client); +uint8_t disconnectState = 0; +uint32_t disconnectTime = 0; +Metro minute = Metro(60000); +Metro second = Metro(1000); +uint32_t uptime; + + +void callback(char* topic, byte* payload, unsigned int length) { + const uint8_t BUFSIZE = 128; + if ((length + 1) >= BUFSIZE) { // 1 for terminating NUL + // Serial << "Received message too long, ignore it" << endl; + } else { + char buffer[BUFSIZE]; + memcpy(buffer, payload, length); + *(buffer + length) = 0; + Serial << "Received message: " << length << ", " << String(topic) << ", " << String(buffer) << endl; + // 00000001 08 01 02 03 04 05 06 07 08 + // ^^^^^^^^ Address + // ^^Payload Length + // ^^^^^^^^^^^^^^^^^^^^^^^ Payload + + + if (!(strcmp(topic, MESSAGE_TOPIC))) { + char *paramPtr = buffer; + Serial << paramPtr << endl; + uint32_t canAddress; + uint8_t canLength; + const uint8_t MAX_CAN_MSG_SIZE = 8; + uint8_t canDataBuffer[MAX_CAN_MSG_SIZE]; + uint8_t canDataBufferIdx = 0; + uint8_t state = 0; + int8_t done = 0; + while (done == 0) { + Serial << "State: " << (uint16_t) state << endl; + if ((paramPtr != 0) && (*paramPtr != 0)) { + char *dataPtr = strsep(¶mPtr, " "); + uint32_t data = strtol(dataPtr, NULL, 16); + switch (state) { + case 0: + Serial << "address found: " << data << endl; + canAddress = data; + state++; + break; + case 1: + Serial << "length found: " << data << endl; + if (data > MAX_CAN_MSG_SIZE) { + Serial << "length too large" << endl; + done = -1; + } else { + canLength = data; + state = 2; + } + break; + case 2: + Serial << "payload octet found: " << data << endl; + if (data > 255) { + Serial << "too large for an octet" << endl; + done = -1; + } else { + canDataBuffer[canDataBufferIdx] = data; + canDataBufferIdx++; + if (canDataBufferIdx == canLength) { + done = 1; + } + } + break; + } + + } else { + Serial << "Error in received message, to little data" << endl; + done = -1; + } + } + if (done == 1) { + Serial << "complete message received" << endl; + Serial << "Address: " << canAddress << endl; + Serial << "Length: " << (uint16_t)canLength << endl; + Serial << "Data: "; + for (uint8_t i = 0; i < canLength; i++) { + Serial << (uint16_t)canDataBuffer[i] << " "; + } + Serial << endl; + CanClientNS::sendMessage(canAddress, canLength, canDataBuffer); + } else if (done == -1) { + Serial << "error while evaluating message" << endl; + } + } else if (!strcmp(topic, WATCHDOG_TOPIC)) { + wdt_reset(); + } else { + // Serial << "Strange, unknown topic received" << endl; + } + } +} + +void MqttClientNS::begin() { + Ethernet.begin(MAC); + Serial << "Got IP address: " << Ethernet.localIP() << endl; + disconnectState = 3; + disconnectTime = millis(); +} + +void MqttClientNS::exec() { + if ((disconnectState == 0) && (! mqttClient.loop())) { + disconnectState = 1; + } + + switch (disconnectState) { + case 0: + // Serial.println("discState 0"); + // everything fine + break; + case 1: + // Serial.println("discState 1"); + mqttClient.disconnect(); + disconnectTime = millis(); + disconnectState = 2; + break; + case 2: + // Serial.println("discState 3"); + if (disconnectTime + 2000 < millis()) { + disconnectState = 3; + } + break; + case 3: + // Serial.println("discState 3"); + if (mqttClient.connect("Monitor")) { + mqttClient.subscribe(MESSAGE_TOPIC); + mqttClient.subscribe(WATCHDOG_TOPIC); + disconnectTime = millis(); + mqttClient.publish("IoT/MqttCanGateway/Started", "MqttCanGateway started"); + disconnectState = 0; + } else { + disconnectState = 1; + } + break; + default: + disconnectState = 0; + break; + } + + if (second.check() == 1) { + uptime++; + + // Serial.println("mqtt tick"); + + if (disconnectState == 0) { + String msg = String("{ \"metadata\": { \"device\": \"MqttCanGateway\" }, \"data\": { \"uptime\": ") + uptime + String("}}"); + mqttClient.publish("IoT/MqttCanGateway/Heartbeat", (char*)msg.c_str()); + } + } + +} + +void sendMessage(uint32_t address, uint8_t length, uint8_t *payload) { + const uint8_t BUFSIZE1 = 25; + char buffer1[BUFSIZE1]; + char *bufPtr = buffer1; + for (uint8_t i = 0; i < length; i++) { + int res = sprintf(bufPtr, "%02x ", payload[i]); + bufPtr += res; + } + int l = strlen(buffer1); + buffer1[l-1] = 0; + + const uint8_t BUFSIZE2 = 170; + char buffer2[BUFSIZE2]; + int res = snprintf(buffer2, BUFSIZE2, "{ \"metadata\": { \"device\":\"MqttCanGateway\" }, \"data\": { \"uptime\": %ld, \"address\": \"%08lx\", \"length\": %d, \"payload\": \"%s\" }}", uptime, address, length, buffer1); + if (res >= BUFSIZE2) { + Serial << "too much payload" << endl; + } else { + Serial << "length: " << res << endl; + Serial << "about to send " << buffer2 << endl; + } +} + diff --git a/mqttclient.h b/mqttclient.h new file mode 100644 index 0000000..381e5aa --- /dev/null +++ b/mqttclient.h @@ -0,0 +1,20 @@ +/* + * mqttclient.h + * + * Created on: 03.03.2016 + * Author: wn + */ + +#ifndef MQTTCLIENT_H_ +#define MQTTCLIENT_H_ + +#include + +namespace MqttClientNS { + void begin(); + void exec(); + void sendMessage(uint32_t address, uint8_t length, uint8_t *payload); +}; + + +#endif /* MQTTCLIENT_H_ */