diff --git a/.cproject b/.cproject index f0c1847..de0336b 100644 --- a/.cproject +++ b/.cproject @@ -25,6 +25,8 @@ + + @@ -43,9 +45,11 @@ - + + + diff --git a/.hgignore b/.hgignore new file mode 100644 index 0000000..b16f296 --- /dev/null +++ b/.hgignore @@ -0,0 +1,3 @@ + +syntax: regexp +^Release$ \ No newline at end of file diff --git a/MQTT/PubSubClient.cpp b/MQTT/PubSubClient.cpp new file mode 100755 index 0000000..d931b8a --- /dev/null +++ b/MQTT/PubSubClient.cpp @@ -0,0 +1,365 @@ +/* + PubSubClient.cpp - A simple client for MQTT. + Nicholas O'Leary + http://knolleary.net +*/ + +#include "PubSubClient.h" +#include + +PubSubClient::PubSubClient(Client& client) { + this->_client = &client; +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { + this->_client = &client; + this->callback = callback; + this->ip = ip; + this->port = port; +} + +PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { + this->_client = &client; + this->callback = callback; + this->domain = domain; + this->port = port; +} + +boolean PubSubClient::connect(char *id) { + return connect(id,NULL,NULL,0,0,0,0); +} + +boolean PubSubClient::connect(char *id, char *user, char *pass) { + return connect(id,user,pass,0,0,0,0); +} + +boolean PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) +{ + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { +// if (!connected()) { + if (true) { + int result = 0; + + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + + if (result) { + nextMsgId = 1; + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + unsigned int j; + for (j = 0;j<9;j++) { + buffer[length++] = d[j]; + } + + uint8_t v; + if (willTopic) { + v = 0x06|(willQos<<3)|(willRetain<<5); + } else { + v = 0x02; + } + + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + + buffer[length++] = v; + + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } + + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } + } + + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + while (!_client->available()) { + unsigned long t = millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _client->stop(); + return false; + } + } + uint16_t len = readPacket(); + + if (len == 4 && buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + return true; + } + } + _client->stop(); + } + return false; +} + +uint8_t PubSubClient::readByte() { + while(!_client->available()) {} + return _client->read(); +} + +uint16_t PubSubClient::readPacket() { + uint16_t len = 0; + buffer[len++] = readByte(); + uint8_t multiplier = 1; + uint16_t length = 0; + uint8_t digit = 0; + do { + digit = readByte(); + buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier *= 128; + } while ((digit & 128) != 0); + + for (uint16_t i = 0;i MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + _client->stop(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint16_t len = readPacket(); + if (len > 0) { + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (buffer[2]<<8)+buffer[3]; + char topic[tl+1]; + for (uint16_t i=0;iwrite(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } + } + return true; + } + return false; +} + +boolean PubSubClient::publish(char* topic, char* payload) { + return publish(topic,(uint8_t*)payload,strlen(payload),false); +} + +boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength) { + return publish(topic, payload, plength, false); +} + +boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, boolean retained) { + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + length = writeString(topic,buffer,length); + uint16_t i; + for (i=0;i 0) { + digit |= 0x80; + } + buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,buffer,pos); + + rc += _client->write(buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + return rc == len + 1 + plength; +} + +boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint8_t rc; + uint8_t len = length; + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;iwrite(buf+(4-llen),length+1+llen); + + const size_t SEND_AT_ONCE = 64; + size_t remains = length + 1 + llen; + // Serial.print("write len: "); Serial.println(remains); + const uint8_t *writebuf = buf + (4 - llen); + bool result = true; + while ((remains > 0) && result) { + size_t actuallySendChars = (remains > SEND_AT_ONCE) ? SEND_AT_ONCE : remains; + // Serial.print("tbs: "); Serial.println(actuallySendChars); + size_t sentChars = _client->write(writebuf, actuallySendChars); + result = sentChars == actuallySendChars; + remains -= sentChars; + writebuf += sentChars; + } + + + lastOutActivity = millis(); +// return (rc == 1+llen+length); + return result; +} + + +boolean PubSubClient::subscribe(char* topic) { + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 7; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[0] = (nextMsgId >> 8); + buffer[1] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + buffer[length++] = 0; // Only do QoS 0 subs + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +void PubSubClient::disconnect() { + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client->write(buffer,2); + _client->stop(); + lastInActivity = lastOutActivity = millis(); +} + +uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) { + char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +boolean PubSubClient::connected() { + int rc = (int)_client->connected(); + //Serial.print("rc: "); Serial.println(rc); + if (!rc) { + //Serial.println("would stop"); + _client->stop(); + // while (true); + } + return rc; +} + diff --git a/MQTT/PubSubClient.h b/MQTT/PubSubClient.h new file mode 100755 index 0000000..a127add --- /dev/null +++ b/MQTT/PubSubClient.h @@ -0,0 +1,75 @@ +/* + PubSubClient.h - A simple client for MQTT. + Nicholas O'Leary + http://knolleary.net +*/ + +#ifndef PubSubClient_h +#define PubSubClient_h + +#include +#include "Client.h" + +// MQTT_MAX_PACKET_SIZE : Maximum packet size +#define MQTT_MAX_PACKET_SIZE 256 + +// MQTT_KEEPALIVE : keepAlive interval in Seconds +#define MQTT_KEEPALIVE 15 + +#define MQTTPROTOCOLVERSION 3 +#define MQTTCONNECT 1 << 4 // Client request to connect to Server +#define MQTTCONNACK 2 << 4 // Connect Acknowledgment +#define MQTTPUBLISH 3 << 4 // Publish message +#define MQTTPUBACK 4 << 4 // Publish Acknowledgment +#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) +#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) +#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) +#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request +#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment +#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request +#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment +#define MQTTPINGREQ 12 << 4 // PING Request +#define MQTTPINGRESP 13 << 4 // PING Response +#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting +#define MQTTReserved 15 << 4 // Reserved + +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + +class PubSubClient { +private: + Client* _client; + uint8_t buffer[MQTT_MAX_PACKET_SIZE]; + uint16_t nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; + bool pingOutstanding; + void (*callback)(char*,uint8_t*,unsigned int); + uint16_t readPacket(); + uint8_t readByte(); + boolean write(uint8_t header, uint8_t* buf, uint16_t length); + uint16_t writeString(char* string, uint8_t* buf, uint16_t pos); + uint8_t *ip; + char* domain; + uint16_t port; +public: + PubSubClient(Client& client); + PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + boolean connect(char *); + boolean connect(char *, char *, char *); + boolean connect(char *, char *, uint8_t, uint8_t, char *); + boolean connect(char *, char *, char *, char *, uint8_t, uint8_t, char*); + void disconnect(); + boolean publish(char *, char *); + boolean publish(char *, uint8_t *, unsigned int); + boolean publish(char *, uint8_t *, unsigned int, boolean); + boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean); + boolean subscribe(char *); + boolean loop(); + boolean connected(); +}; + + +#endif diff --git a/MqttClient.cpp b/MqttClient.cpp new file mode 100644 index 0000000..2abf4f2 --- /dev/null +++ b/MqttClient.cpp @@ -0,0 +1,99 @@ +/* + * MqttClient.cpp + * + * Created on: 08.05.2015 + * Author: wn + */ + +#include "MqttClient.h" + +#include + + +void callback(char* topic, byte* payload, unsigned int length) { + // handle message arrived +} + + +MqttClient::MqttClient(RequestSender *meterBusMaster) : + m_client(255), m_meterBusMaster(meterBusMaster), m_mqttClient(MQTT_BROKER, MQTT_PORT, callback, m_client), + m_disconnectState(3), m_disconnectTime(millis()) +{ +} + +void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength) { + char strbuf[256]; + memset(strbuf, sizeof(strbuf), 0); + PString buf = PString(strbuf, sizeof(strbuf)); + buf << "{ \"metadata\": { \"device\": \"MeterbusHub\" }, " << + "\"data\": {" << + "}" << + "}" << endl; + if (m_disconnectState == 0) { + m_mqttClient.publish("MeterbusHub.Measurement", strbuf); + } + + // m_server.write(responseBuffer, responseBufferLength); +} + +void MqttClient::sendError(uint8_t code) { +} + +void MqttClient::begin() { +} + +void MqttClient::exec() { + if ((m_disconnectState == 0) && (! m_mqttClient.loop())) { + m_disconnectState = 1; + } + + switch (m_disconnectState) { + case 0: + // Serial.println("discState 0"); + // everything fine + break; + case 1: + Serial.println("discState 1"); + m_mqttClient.disconnect(); + m_disconnectTime = millis(); + m_disconnectState = 2; + break; + case 2: + Serial.println("discState 3"); + if (m_disconnectTime + 2000 < millis()) { + m_disconnectState = 3; + } + break; + case 3: + Serial.println("discState 3"); + if (m_mqttClient.connect("MeterbusHub")) { + m_disconnectTime = millis(); + m_disconnectState = 0; + } else { + m_disconnectState = 1; + } + break; + default: + m_disconnectState = 0; + break; + } + + +// m_client = m_server.available(); +// if (m_client) { +// uint16_t sendBufLen = 0; +// uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer(); +// if (sendBuffer != 0) { +// int chi; +// while ((chi = m_client.read()) != -1) { +// char ch = (char) chi; +// *(sendBuffer + sendBufLen) = ch; +// sendBufLen++; +// } +// m_meterBusMaster->sendBufferReady(sendBufLen, this); +// } +// } +} + + + diff --git a/MqttClient.h b/MqttClient.h new file mode 100644 index 0000000..b44bba6 --- /dev/null +++ b/MqttClient.h @@ -0,0 +1,38 @@ +/* + * MqttClient.h + * + * Created on: 08.05.2015 + * Author: wn + */ + +#ifndef MQTTCLIENT_H_ +#define MQTTCLIENT_H_ + +#include +#include +#include "mBusDialog.h" + + +const String MQTT_BROKER = "172.16.2.16"; +const uint16_t MQTT_PORT = 1883; + +class MqttClient : public ResponseCallback { +public: + MqttClient(RequestSender *meterBusMaster); + void begin(); + void exec(); + virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength); + virtual void sendError(uint8_t code); +private: + EthernetClient m_client; + PubSubClient m_mqttClient; + RequestSender *m_meterBusMaster; + uint8_t m_disconnectState; + uint32_t m_disconnectTime; + +}; + + + + +#endif /* MQTTCLIENT_H_ */ diff --git a/NetMeterBusMaster2.cpp b/NetMeterBusMaster2.cpp index e688957..cb4d016 100644 --- a/NetMeterBusMaster2.cpp +++ b/NetMeterBusMaster2.cpp @@ -6,7 +6,7 @@ #include "cmd.h" #include "test.h" #include "uptime.h" -#include "meterBusServer.h" +// #include "meterBusServer.h" #include "meterBusMaster.h" #include "overCurrentProt.h" #include @@ -23,7 +23,7 @@ static TestCmd testCmd; static Uptime uptime; static MeterBusMaster meterBusMaster; -static MeterBusServer meterBusServer(2001, &meterBusMaster); +// static MeterBusServer meterBusServer(2001, &meterBusMaster); static OverCurrentProt overCurrentProt; @@ -49,7 +49,7 @@ void setup() { overCurrentProt.begin(&cmdServer); meterBusMaster.begin(&cmdServer); - meterBusServer.begin(); + // meterBusServer.begin(); digitalWrite(POWER_LED, HIGH); } @@ -60,5 +60,5 @@ void loop() { overCurrentProt.exec(); meterBusMaster.exec(); - meterBusServer.exec(); + // meterBusServer.exec(); } diff --git a/PString/PString.cpp b/PString/PString.cpp new file mode 100644 index 0000000..e9a3044 --- /dev/null +++ b/PString/PString.cpp @@ -0,0 +1,60 @@ +/* + PString.cpp - Lightweight printable string class + Copyright (c) 2009-2012 Mikal Hart. All right reserved. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "PString.h" + +void PString::begin() +{ + _cur = _buf; + if (_size > 0) + _buf[0] = '\0'; +} + +#if defined(ARDUINO) && ARDUINO >= 100 +size_t PString::write(uint8_t b) +#else +void PString::write(uint8_t b) +#endif +{ + if (_cur + 1 < _buf + _size) + { + *_cur++ = (char)b; + *_cur = '\0'; +#if defined(ARDUINO) && ARDUINO >= 100 + return 1; +#endif + } + +#if defined(ARDUINO) && ARDUINO >= 100 + return 0; +#endif +} + +int PString::format(char *str, ...) +{ + va_list argptr; + va_start(argptr, str); + int ret = vsnprintf(_cur, _size - (_cur - _buf), str, argptr); + if (_size) + while (*_cur) + ++_cur; + return ret; +} + + diff --git a/PString/PString.h b/PString/PString.h new file mode 100644 index 0000000..5be6f06 --- /dev/null +++ b/PString/PString.h @@ -0,0 +1,87 @@ +/* + PString.h - Lightweight printable string class + Copyright (c) 2009-2012 Mikal Hart. All right reserved. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef PString_h +#define PString_h + +#include "Print.h" +#include +#include +#include +#include + +#define PSTRING_LIBRARY_VERSION 3 + +class PString : public Print +{ +private: + char *_buf, *_cur; + size_t _size; +public: +#if defined(ARDUINO) && ARDUINO >= 100 + virtual size_t write(uint8_t); +#else + virtual void write(uint8_t); +#endif + +public: + + // Basic constructor requires a preallocated buffer + PString(char *buf, size_t size) : _buf(buf), _size(size) + { begin(); } + + // templated constructors allow inline renderings of this type: PString(buf, size, myfloat[, modifier]); + template PString(char *buf, size_t size, T arg) : _buf(buf), _size(size) + { begin(); print(arg); } + + template PString(char *buf, size_t size, T arg, int modifier) : _buf(buf), _size(size) + { begin(); print(arg, modifier); } + + // returns the length of the current string, not counting the 0 terminator + inline const size_t length() + { return _cur - _buf; } + + // returns the capacity of the string + inline const size_t capacity() + { return _size; } + + // gives access to the internal string + inline operator const char *() + { return _buf; } + + // compare to another string + bool operator==(const char *str) + { return _size > 0 && !strcmp(_buf, str); } + + // call this to re-use an existing string + void begin(); + + // This function allows assignment to an arbitrary scalar value like str = myfloat; + template inline PString &operator =(T arg) + { begin(); print(arg); return *this; } + + // Concatenation str += myfloat; + template inline PString &operator +=(T arg) + { print(arg); return *this; } + + // Safe access to sprintf-like formatting, e.g. str.format("Hi, my name is %s and I'm %d years old", name, age); + int format(char *str, ...); +}; + +#endif \ No newline at end of file