From 67632ef43e6cdcdf4e5e50e24a38c97156c2a1f7 Mon Sep 17 00:00:00 2001 From: hg Date: Wed, 6 May 2015 19:08:30 +0200 Subject: [PATCH] add PubSubClient MQTT library --- .cproject | 5 +- .settings/org.eclipse.cdt.core.prefs | 4 +- MQTT/PubSubClient.cpp | 337 +++++++++++++++++++++++++++ MQTT/PubSubClient.h | 75 ++++++ WiModbusGateway.cpp | 67 +++--- WiModbusGateway.h | 2 +- 6 files changed, 450 insertions(+), 40 deletions(-) create mode 100755 MQTT/PubSubClient.cpp create mode 100755 MQTT/PubSubClient.h diff --git a/.cproject b/.cproject index 013029a..bea4146 100644 --- a/.cproject +++ b/.cproject @@ -24,6 +24,7 @@ + @@ -31,6 +32,7 @@ @@ -43,8 +45,9 @@ - + + diff --git a/.settings/org.eclipse.cdt.core.prefs b/.settings/org.eclipse.cdt.core.prefs index 6389e33..56f6456 100644 --- a/.settings/org.eclipse.cdt.core.prefs +++ b/.settings/org.eclipse.cdt.core.prefs @@ -151,10 +151,10 @@ environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.DTS/value=3600 environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.LOCAL/delimiter=\: environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.LOCAL/operation=replace -environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.LOCAL/value=1430496628 +environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.LOCAL/value=1430902339 environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.UTC/delimiter=\: environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.UTC/operation=replace -environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.UTC/value=1430489428 +environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.UTC/value=1430895139 environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.ZONE/delimiter=\: environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.ZONE/operation=replace environment/project/it.baeyens.arduino.core.toolChain.release.1600398586/A.EXTRA.TIME.ZONE/value=3600 diff --git a/MQTT/PubSubClient.cpp b/MQTT/PubSubClient.cpp new file mode 100755 index 0000000..1f8a19c --- /dev/null +++ b/MQTT/PubSubClient.cpp @@ -0,0 +1,337 @@ +/* + PubSubClient.cpp - A simple client for MQTT. + Nicholas O'Leary + http://knolleary.net +*/ + +#include "PubSubClient.h" +#include + +PubSubClient::PubSubClient(Client& client) { + this->_client = &client; +} + +PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { + this->_client = &client; + this->callback = callback; + this->ip = ip; + this->port = port; +} + +PubSubClient::PubSubClient(char* domain, uint16_t port, void (*callback)(char*,uint8_t*,unsigned int), Client& client) { + this->_client = &client; + this->callback = callback; + this->domain = domain; + this->port = port; +} + +boolean PubSubClient::connect(char *id) { + return connect(id,NULL,NULL,0,0,0,0); +} + +boolean PubSubClient::connect(char *id, char *user, char *pass) { + return connect(id,user,pass,0,0,0,0); +} + +boolean PubSubClient::connect(char *id, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) +{ + return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage); +} + +boolean PubSubClient::connect(char *id, char *user, char *pass, char* willTopic, uint8_t willQos, uint8_t willRetain, char* willMessage) { + if (!connected()) { + int result = 0; + + if (domain != NULL) { + result = _client->connect(this->domain, this->port); + } else { + result = _client->connect(this->ip, this->port); + } + + if (result) { + nextMsgId = 1; + uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p',MQTTPROTOCOLVERSION}; + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + unsigned int j; + for (j = 0;j<9;j++) { + buffer[length++] = d[j]; + } + + uint8_t v; + if (willTopic) { + v = 0x06|(willQos<<3)|(willRetain<<5); + } else { + v = 0x02; + } + + if(user != NULL) { + v = v|0x80; + + if(pass != NULL) { + v = v|(0x80>>1); + } + } + + buffer[length++] = v; + + buffer[length++] = ((MQTT_KEEPALIVE) >> 8); + buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + length = writeString(id,buffer,length); + if (willTopic) { + length = writeString(willTopic,buffer,length); + length = writeString(willMessage,buffer,length); + } + + if(user != NULL) { + length = writeString(user,buffer,length); + if(pass != NULL) { + length = writeString(pass,buffer,length); + } + } + + write(MQTTCONNECT,buffer,length-5); + + lastInActivity = lastOutActivity = millis(); + + while (!_client->available()) { + unsigned long t = millis(); + if (t-lastInActivity > MQTT_KEEPALIVE*1000UL) { + _client->stop(); + return false; + } + } + uint16_t len = readPacket(); + + if (len == 4 && buffer[3] == 0) { + lastInActivity = millis(); + pingOutstanding = false; + return true; + } + } + _client->stop(); + } + return false; +} + +uint8_t PubSubClient::readByte() { + while(!_client->available()) {} + return _client->read(); +} + +uint16_t PubSubClient::readPacket() { + uint16_t len = 0; + buffer[len++] = readByte(); + uint8_t multiplier = 1; + uint16_t length = 0; + uint8_t digit = 0; + do { + digit = readByte(); + buffer[len++] = digit; + length += (digit & 127) * multiplier; + multiplier *= 128; + } while ((digit & 128) != 0); + + for (uint16_t i = 0;i MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if (pingOutstanding) { + _client->stop(); + return false; + } else { + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + _client->write(buffer,2); + lastOutActivity = t; + lastInActivity = t; + pingOutstanding = true; + } + } + if (_client->available()) { + uint16_t len = readPacket(); + if (len > 0) { + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + if (type == MQTTPUBLISH) { + if (callback) { + uint16_t tl = (buffer[2]<<8)+buffer[3]; + char topic[tl+1]; + for (uint16_t i=0;iwrite(buffer,2); + } else if (type == MQTTPINGRESP) { + pingOutstanding = false; + } + } + } + return true; + } + return false; +} + +boolean PubSubClient::publish(char* topic, char* payload) { + return publish(topic,(uint8_t*)payload,strlen(payload),false); +} + +boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength) { + return publish(topic, payload, plength, false); +} + +boolean PubSubClient::publish(char* topic, uint8_t* payload, unsigned int plength, boolean retained) { + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 5; + length = writeString(topic,buffer,length); + uint16_t i; + for (i=0;i 0) { + digit |= 0x80; + } + buffer[pos++] = digit; + llen++; + } while(len>0); + + pos = writeString(topic,buffer,pos); + + rc += _client->write(buffer,pos); + + for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); + } + + lastOutActivity = millis(); + return rc == len + 1 + plength; +} + +boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { + uint8_t lenBuf[4]; + uint8_t llen = 0; + uint8_t digit; + uint8_t pos = 0; + uint8_t rc; + uint8_t len = length; + do { + digit = len % 128; + len = len / 128; + if (len > 0) { + digit |= 0x80; + } + lenBuf[pos++] = digit; + llen++; + } while(len>0); + + buf[4-llen] = header; + for (int i=0;iwrite(buf+(4-llen),length+1+llen); + + lastOutActivity = millis(); + return (rc == 1+llen+length); +} + + +boolean PubSubClient::subscribe(char* topic) { + if (connected()) { + // Leave room in the buffer for header and variable length field + uint16_t length = 7; + nextMsgId++; + if (nextMsgId == 0) { + nextMsgId = 1; + } + buffer[0] = (nextMsgId >> 8); + buffer[1] = (nextMsgId & 0xFF); + length = writeString(topic, buffer,length); + buffer[length++] = 0; // Only do QoS 0 subs + return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5); + } + return false; +} + +void PubSubClient::disconnect() { + buffer[0] = MQTTDISCONNECT; + buffer[1] = 0; + _client->write(buffer,2); + _client->stop(); + lastInActivity = lastOutActivity = millis(); +} + +uint16_t PubSubClient::writeString(char* string, uint8_t* buf, uint16_t pos) { + char* idp = string; + uint16_t i = 0; + pos += 2; + while (*idp) { + buf[pos++] = *idp++; + i++; + } + buf[pos-i-2] = (i >> 8); + buf[pos-i-1] = (i & 0xFF); + return pos; +} + + +boolean PubSubClient::connected() { + int rc = (int)_client->connected(); + if (!rc) _client->stop(); + return rc; +} + diff --git a/MQTT/PubSubClient.h b/MQTT/PubSubClient.h new file mode 100755 index 0000000..a127add --- /dev/null +++ b/MQTT/PubSubClient.h @@ -0,0 +1,75 @@ +/* + PubSubClient.h - A simple client for MQTT. + Nicholas O'Leary + http://knolleary.net +*/ + +#ifndef PubSubClient_h +#define PubSubClient_h + +#include +#include "Client.h" + +// MQTT_MAX_PACKET_SIZE : Maximum packet size +#define MQTT_MAX_PACKET_SIZE 256 + +// MQTT_KEEPALIVE : keepAlive interval in Seconds +#define MQTT_KEEPALIVE 15 + +#define MQTTPROTOCOLVERSION 3 +#define MQTTCONNECT 1 << 4 // Client request to connect to Server +#define MQTTCONNACK 2 << 4 // Connect Acknowledgment +#define MQTTPUBLISH 3 << 4 // Publish message +#define MQTTPUBACK 4 << 4 // Publish Acknowledgment +#define MQTTPUBREC 5 << 4 // Publish Received (assured delivery part 1) +#define MQTTPUBREL 6 << 4 // Publish Release (assured delivery part 2) +#define MQTTPUBCOMP 7 << 4 // Publish Complete (assured delivery part 3) +#define MQTTSUBSCRIBE 8 << 4 // Client Subscribe request +#define MQTTSUBACK 9 << 4 // Subscribe Acknowledgment +#define MQTTUNSUBSCRIBE 10 << 4 // Client Unsubscribe request +#define MQTTUNSUBACK 11 << 4 // Unsubscribe Acknowledgment +#define MQTTPINGREQ 12 << 4 // PING Request +#define MQTTPINGRESP 13 << 4 // PING Response +#define MQTTDISCONNECT 14 << 4 // Client is Disconnecting +#define MQTTReserved 15 << 4 // Reserved + +#define MQTTQOS0 (0 << 1) +#define MQTTQOS1 (1 << 1) +#define MQTTQOS2 (2 << 1) + +class PubSubClient { +private: + Client* _client; + uint8_t buffer[MQTT_MAX_PACKET_SIZE]; + uint16_t nextMsgId; + unsigned long lastOutActivity; + unsigned long lastInActivity; + bool pingOutstanding; + void (*callback)(char*,uint8_t*,unsigned int); + uint16_t readPacket(); + uint8_t readByte(); + boolean write(uint8_t header, uint8_t* buf, uint16_t length); + uint16_t writeString(char* string, uint8_t* buf, uint16_t pos); + uint8_t *ip; + char* domain; + uint16_t port; +public: + PubSubClient(Client& client); + PubSubClient(uint8_t *, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + PubSubClient(char*, uint16_t, void(*)(char*,uint8_t*,unsigned int),Client& client); + boolean connect(char *); + boolean connect(char *, char *, char *); + boolean connect(char *, char *, uint8_t, uint8_t, char *); + boolean connect(char *, char *, char *, char *, uint8_t, uint8_t, char*); + void disconnect(); + boolean publish(char *, char *); + boolean publish(char *, uint8_t *, unsigned int); + boolean publish(char *, uint8_t *, unsigned int, boolean); + boolean publish_P(char *, uint8_t PROGMEM *, unsigned int, boolean); + boolean subscribe(char *); + boolean loop(); + boolean connected(); +}; + + +#endif diff --git a/WiModbusGateway.cpp b/WiModbusGateway.cpp index 2bed4a5..91384e8 100644 --- a/WiModbusGateway.cpp +++ b/WiModbusGateway.cpp @@ -1,3 +1,5 @@ +#include "WiModbusGateway.h" + #include #include #include @@ -8,6 +10,7 @@ #include "ModbusApp.h" #include "PString.h" #include "Streaming.h" +#include "PubSubClient.h" @@ -16,18 +19,26 @@ LiquidCrystal lcd = LiquidCrystal(A0, A1, A2, A3, A4, A5); // Mudbus Mb; -WiFiUDP udpSock; +// WiFiUDP udpSock; +WiFiClient wifiClient; +byte server[] = { 192, 168, 87, 100 }; +PubSubClient client(server, 1883, callback, wifiClient); -//char ssid[] = "Kinderland"; // your network SSID (name) -//char pass[] = "test1234"; // your network password -char ssid[] = "MessWLAN"; -char pass[] = "UNVmpwbr6heQnMQ7ykXT"; +char ssid[] = "Kinderland"; // your network SSID (name) +char pass[] = "test1234"; // your network password +//char ssid[] = "MessWLAN"; +//char pass[] = "UNVmpwbr6heQnMQ7ykXT"; Metro tick = Metro(10000); Metro second = Metro(1000); uint32_t uptime; +void callback(char* topic, byte* payload, unsigned int length) { + // handle message arrived +} + + void printWifiStatus() { // print the SSID of the network you're attached to: Serial.print("SSID: "); @@ -89,10 +100,14 @@ void setup() { lcd.setCursor(0, 2); lcd.print("Connected."); + + + printWifiStatus(); - delay(10000); + + delay(1000); lcd.home(); lcd.clear(); @@ -105,6 +120,7 @@ void setup() { // modbusAppBegin(&Mb); modbusAppBegin(); + client.connect("arduinoClient"); } @@ -149,6 +165,8 @@ void updateDisplay() { void loop() { modbusAppExec(); // Mb.Run(); + client.loop(); + if (second.check() == 1) { uptime++; @@ -156,45 +174,22 @@ void loop() { char strbuf[256]; memset(strbuf, sizeof(strbuf), 0); PString buf = PString(strbuf, sizeof(strbuf)); - - udpSock.beginPacket("172.16.2.16", 9999); - buf << "{ \"metadata\": { \"table\": \"WiFiPowerMeter\" }, \"data\": {"; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "\"voltage\": " << getVoltage() << ", "; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "\"current\": " << getCurrent() << ", "; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "\"frequency\": " << getFrequency() << ", "; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "\"power\": " << getPower() << ", "; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "\"energy\": " << getEnergy() << ", "; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "\"uptime\": " << uptime; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "}"; - udpSock.write(strbuf, buf.length()); - - buf.begin(); buf << "}" << endl; - udpSock.write(strbuf, buf.length()); - - udpSock.endPacket(); +// uint8_t x = uptime % 128; +// for(uint8_t y = 1; y < x; y++) { +// buf << "a"; +// } +// Serial.print("buflen: "); Serial.println(buf.length()); + client.publish("outTopic", strbuf); +// client.publish("outTopic", "tick"); } diff --git a/WiModbusGateway.h b/WiModbusGateway.h index a4d7389..9f8b9d7 100644 --- a/WiModbusGateway.h +++ b/WiModbusGateway.h @@ -22,7 +22,7 @@ void setup(); //add your function definitions for the project ModbusGateway here - +void callback(char* topic, byte* payload, unsigned int length); //Do not add code below this line