diff --git a/MQTT/PubSubClient.h b/MQTT/PubSubClient.h index a127add..539e411 100755 --- a/MQTT/PubSubClient.h +++ b/MQTT/PubSubClient.h @@ -11,7 +11,7 @@ #include "Client.h" // MQTT_MAX_PACKET_SIZE : Maximum packet size -#define MQTT_MAX_PACKET_SIZE 256 +#define MQTT_MAX_PACKET_SIZE 1024 // MQTT_KEEPALIVE : keepAlive interval in Seconds #define MQTT_KEEPALIVE 15 diff --git a/MqttClient.cpp b/MqttClient.cpp index 6c023b3..54b0bac 100644 --- a/MqttClient.cpp +++ b/MqttClient.cpp @@ -18,12 +18,6 @@ const uint16_t MQTT_PORT = 1883; void callback(char* topic, byte* payload, unsigned int length) { - char strbuf[256]; - memset(strbuf, sizeof(strbuf), 0); - memcpy(strbuf, payload, length); - Serial << "MQTT Message received: " << topic << ", " << strbuf << endl; - - // handle message arrived } @@ -38,18 +32,20 @@ MqttClient::MqttClient(RequestSender *meterBusMaster) : m_mbusDevTuple[i] = { 0, 0, 0, 0 }; } - m_mbusDevTuple[0] = { 1, 0x53, 60, 0 }; // light meter + m_mbusDevTuple[0] = { 1, 0x53, 10, 0 }; // light meter + m_mbusDevTuple[1] = { 2, 32, 10, 0 }; // electrity } void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token) { - char strbuf[256]; + char strbuf[1024]; memset(strbuf, sizeof(strbuf), 0); PString buf = PString(strbuf, sizeof(strbuf)); - buf << "{ \"metadata\": { \"device\": \"MeterbusHub\" }, " << - "\"data\": {" << + buf << "{ \"metadata\": { \"device\": \"MeterbusHub\", " << "\"token\": " << token << ", " << + "}, " << + "\"data\": {" << "\"uptime\": " << m_uptime << ", " << "\"telegram\": \""; @@ -69,26 +65,37 @@ void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLe buf << "\"}}"; if (m_disconnectState == 0) { + Serial << "publishing " << strbuf << endl; + Serial << "length: " << buf.length() << endl; m_mqttClient.publish("MeterbusHub.Measurement", strbuf); } else { Serial << "no MQTT connection, message lost: " << endl << strbuf << endl; } - - // m_server.write(responseBuffer, responseBufferLength); } void MqttClient::sendError(uint8_t code, uint8_t token) { + String msg = String("{ \"metadata\": { \"device\": \"MeterbusHub\", \"error\": ") + + code + String(", \"token\": ") + token + String(" }, \"data\": { \"uptime\": ") + m_uptime + String("}}"); + if (m_disconnectState == 0) { + Serial << "publishing " << msg << endl; + Serial << "length: " << msg.length() << endl; + m_mqttClient.publish("MeterbusHub.Measurement", (char*)msg.c_str()); + } else { + Serial << "no MQTT connection, message lost: " << msg << endl; + } } void MqttClient::begin() { } void MqttClient::exec() { + //Serial << "*** a" << endl; if ((m_disconnectState == 0) && (! m_mqttClient.loop())) { m_disconnectState = 1; } + //Serial << "*** b, " << m_disconnectState << endl; switch (m_disconnectState) { case 0: // Serial.println("discState 0"); @@ -106,9 +113,9 @@ void MqttClient::exec() { } break; case 3: + Serial << "Trying to re-connect" << endl; if (m_mqttClient.connect("MeterbusHub")) { Serial << "MQTT connected" << endl; - m_mqttClient.subscribe("MeterbusHub.Configuration"); m_disconnectTime = millis(); m_disconnectState = 0; } else { @@ -120,16 +127,17 @@ void MqttClient::exec() { break; } - + //Serial << "*** c" << endl; if (secondTick.check() == 1) { m_uptime++; + Serial << "Tick " << m_uptime << endl; - String msg = String("{ \"metadata\": { \"device\": \"MeterbusHub\" }, \"data\": { \"uptime\": ") + m_uptime + String("}}"); - if (m_disconnectState == 0) { - m_mqttClient.publish("MeterbusHub.Heartbeat", (char*)msg.c_str()); - } else { - Serial << "no MQTT connection, message lost: " << msg << endl; - } +// String msg = String("{ \"metadata\": { \"device\": \"MeterbusHub\" }, \"data\": { \"uptime\": ") + m_uptime + String("}}"); +// if (m_disconnectState == 0) { +// m_mqttClient.publish("MeterbusHub.Heartbeat", (char*)msg.c_str()); +// } else { +// Serial << "no MQTT connection, message lost: " << msg << endl; +// } for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { if ((m_mbusDevTuple[i].address != 0) && (m_mbusDevTuple[i].timer != 0)) { @@ -142,13 +150,14 @@ void MqttClient::exec() { } } - while (m_deviceIdx < NUM_OF_DEVICES) { + //Serial << "while in" << endl; + while (true) { if ((m_mbusDevTuple[m_deviceIdx].address != 0) && (m_mbusDevTuple[m_deviceIdx].timer == 0)) { - m_mbusDevTuple[m_deviceIdx].timer = m_mbusDevTuple[m_deviceIdx].queryPeriod; Serial << "Issue request for device " << m_mbusDevTuple[m_deviceIdx].token << endl; uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer(); if (sendBuffer != 0) { Serial << "send buffer ready" << endl; + m_mbusDevTuple[m_deviceIdx].timer = m_mbusDevTuple[m_deviceIdx].queryPeriod; sendBuffer[0] = 0x10; sendBuffer[1] = 0x5b; @@ -156,34 +165,24 @@ void MqttClient::exec() { sendBuffer[3] = (uint8_t)(sendBuffer[1] + sendBuffer[2]); sendBuffer[4] = 0x16; m_meterBusMaster->sendBufferReady(5, m_mbusDevTuple[m_deviceIdx].token, this); - break; } else { Serial << "no send buffer ready" << endl; } + break; + } else { + Serial << "Trying " << m_deviceIdx << ", " << m_mbusDevTuple[m_deviceIdx].token << endl; + m_deviceIdx++; + if (m_deviceIdx >= NUM_OF_DEVICES) { + m_deviceIdx = 0; + } + break; } - m_deviceIdx++; - } - if (m_deviceIdx >= NUM_OF_DEVICES) { - m_deviceIdx = 0; } + //Serial << "while out" << endl; } + //Serial << "*** d" << endl; - - // m_client = m_server.available(); - // if (m_client) { - // uint16_t sendBufLen = 0; - // uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer(); - // if (sendBuffer != 0) { - // int chi; - // while ((chi = m_client.read()) != -1) { - // char ch = (char) chi; - // *(sendBuffer + sendBufLen) = ch; - // sendBufLen++; - // } - // m_meterBusMaster->sendBufferReady(sendBufLen, this); - // } - // } } diff --git a/NetMeterBusMaster2.cpp b/NetMeterBusMaster2.cpp index c8be948..cfb5ae1 100644 --- a/NetMeterBusMaster2.cpp +++ b/NetMeterBusMaster2.cpp @@ -11,6 +11,7 @@ #include "overCurrentProt.h" #include #include "MqttClient.h" +#include const uint8_t POWER_LED = 4; @@ -58,11 +59,15 @@ void setup() { } void loop() { + //Serial << "*** 1" << endl; cmdServer.exec(); + //Serial << "*** 2" << endl; uptime.exec(); - + //Serial << "*** 3" << endl; overCurrentProt.exec(); + //Serial << "*** 4" << endl; meterBusMaster.exec(); - // meterBusServer.exec(); + //Serial << "*** 5" << endl; mqttClient.exec(); + //Serial << "*** 6" << endl; } diff --git a/meterBusMaster.cpp b/meterBusMaster.cpp index b65469f..7731333 100644 --- a/meterBusMaster.cpp +++ b/meterBusMaster.cpp @@ -169,10 +169,13 @@ void MeterBusMaster::sendBufferReady(uint16_t sendBufLen, uint8_t token, Respons } void MeterBusMaster::prepareResponse(bool err, uint8_t in) { + Serial << "resp in, err: " << err << endl; static int16_t expectedChars = -1; static uint8_t state = 0; + Serial << "r1" << endl; if (err) { + Serial << "r1" << endl; if (m_responseCallback != 0) { m_responseCallback->sendError(1, m_token); } @@ -181,8 +184,10 @@ void MeterBusMaster::prepareResponse(bool err, uint8_t in) { m_expectResponse = false; m_responseCallback = 0; } else { + Serial << "r2" << endl; switch (state) { case 0: + Serial << "r3" << endl; m_recvBufLen = 0; if (in == 0xe5) { state = 2; @@ -196,22 +201,29 @@ void MeterBusMaster::prepareResponse(bool err, uint8_t in) { } break; case 1: + Serial << "r4" << endl; expectedChars = (int16_t)in; expectedChars += 4; state = 2; break; case 2: + Serial << "r5" << endl; expectedChars--; break; } - if (m_recvBufLen >= RECEIVE_BUFFER_SIZE) + if (m_recvBufLen >= RECEIVE_BUFFER_SIZE) { + Serial << "r6, " << m_recvBufLen << ", " << RECEIVE_BUFFER_SIZE << endl; fatal(FATAL_BUFFER_OVERFLOW + 1); + } + Serial << "r7" << endl; m_recvBuffer[m_recvBufLen] = in; m_recvBufLen++; + Serial << "r8" << endl; if (expectedChars == 0) { + Serial << "r9" << endl; if (m_responseCallback != 0) { m_responseCallback->sendResponse(m_recvBuffer, m_recvBufLen, m_token); } @@ -221,6 +233,8 @@ void MeterBusMaster::prepareResponse(bool err, uint8_t in) { m_responseCallback = 0; } } + + Serial << "resp out" << endl; } void MeterBusMaster::sample() { @@ -243,7 +257,7 @@ void MeterBusMaster::exec() { if (m_cmdReadyToSend) { sample(); - // Serial << "MeterBusMaster: sending " << m_sendBufLen << " octets." << endl; + Serial << "MeterBusMaster: sending " << m_sendBufLen << " octets." << endl; Serial3.write(m_sendBuffer, m_sendBufLen); Serial3.flush(); hold(); @@ -269,7 +283,7 @@ void MeterBusMaster::exec() { int serialInChar = Serial3.read(); if (serialInChar != -1) { - // Serial << "Got: " << _HEX(serialInChar) << endl; + Serial << "Got: " << _HEX(serialInChar) << endl; } if ((serialInChar != -1) && m_expectResponse) { prepareResponse(false, (uint8_t)serialInChar); diff --git a/meterBusMaster.h b/meterBusMaster.h index d60bc1e..11b1ff5 100644 --- a/meterBusMaster.h +++ b/meterBusMaster.h @@ -54,7 +54,7 @@ private: const uint8_t SEND_BUFFER_SIZE = 30; -const uint8_t RECEIVE_BUFFER_SIZE = 180; +const uint8_t RECEIVE_BUFFER_SIZE = 250; class MeterBusMaster : public RequestSender { public: