send buffer handling and token stuff

This commit is contained in:
hg 2015-05-09 20:26:31 +02:00
parent 9c4bf9db30
commit f9df2439a8
7 changed files with 76 additions and 51 deletions

View File

@ -32,16 +32,16 @@ Metro secondTick = Metro(1000);
MqttClient::MqttClient(RequestSender *meterBusMaster) : MqttClient::MqttClient(RequestSender *meterBusMaster) :
m_client(), m_meterBusMaster(meterBusMaster), m_mqttClient(MQTT_BROKER, MQTT_PORT, callback, m_client), m_client(), m_meterBusMaster(meterBusMaster), m_mqttClient(MQTT_BROKER, MQTT_PORT, callback, m_client),
m_disconnectState(3), m_disconnectTime(millis()), m_uptime(0) m_disconnectState(3), m_disconnectTime(millis()), m_uptime(0), m_deviceIdx(0)
{ {
for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) {
m_mbusDevTuple[i] = { 0, 0, 0 }; m_mbusDevTuple[i] = { 0, 0, 0, 0 };
} }
m_mbusDevTuple[0] = { 0x53, 60, 0 }; // light meter m_mbusDevTuple[0] = { 1, 0x53, 60, 0 }; // light meter
} }
void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength) { void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token) {
char strbuf[256]; char strbuf[256];
memset(strbuf, sizeof(strbuf), 0); memset(strbuf, sizeof(strbuf), 0);
PString buf = PString(strbuf, sizeof(strbuf)); PString buf = PString(strbuf, sizeof(strbuf));
@ -49,7 +49,8 @@ void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLe
buf << "{ \"metadata\": { \"device\": \"MeterbusHub\" }, " << buf << "{ \"metadata\": { \"device\": \"MeterbusHub\" }, " <<
"\"data\": {" << "\"data\": {" <<
"\"uptime\": " << m_uptime << ", " "\"token\": " << token << ", " <<
"\"uptime\": " << m_uptime << ", " <<
"\"telegram\": \""; "\"telegram\": \"";
uint16_t i = 0; uint16_t i = 0;
@ -77,7 +78,7 @@ void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLe
// m_server.write(responseBuffer, responseBufferLength); // m_server.write(responseBuffer, responseBufferLength);
} }
void MqttClient::sendError(uint8_t code) { void MqttClient::sendError(uint8_t code, uint8_t token) {
} }
void MqttClient::begin() { void MqttClient::begin() {
@ -130,21 +131,40 @@ void MqttClient::exec() {
Serial << "no MQTT connection, message lost: " << msg << endl; Serial << "no MQTT connection, message lost: " << msg << endl;
} }
for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) {
if ((m_mbusDevTuple[i].address != 0) && (m_mbusDevTuple[i].timer == 0)) { if ((m_mbusDevTuple[i].address != 0) && (m_mbusDevTuple[i].timer != 0)) {
m_mbusDevTuple[i].timer = m_mbusDevTuple[i].queryPeriod; m_mbusDevTuple[i].timer -= 1;
Serial << "Issue request for device " << m_mbusDevTuple[i].address << endl; if (m_mbusDevTuple[i].timer == 0) {
Serial << "Device " << m_mbusDevTuple[i].token << " ready for request" << endl;
} else {
Serial << "Device " << m_mbusDevTuple[i].token << " not ready for request, timer: " << m_mbusDevTuple[i].timer << endl;
}
}
}
while (m_deviceIdx < NUM_OF_DEVICES) {
if ((m_mbusDevTuple[m_deviceIdx].address != 0) && (m_mbusDevTuple[m_deviceIdx].timer == 0)) {
m_mbusDevTuple[m_deviceIdx].timer = m_mbusDevTuple[m_deviceIdx].queryPeriod;
Serial << "Issue request for device " << m_mbusDevTuple[m_deviceIdx].token << endl;
uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer(); uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer();
if (sendBuffer != 0) {
Serial << "send buffer ready" << endl;
sendBuffer[0] = 0x10; sendBuffer[0] = 0x10;
sendBuffer[1] = 0x5b; sendBuffer[1] = 0x5b;
sendBuffer[2] = m_mbusDevTuple[i].address; sendBuffer[2] = m_mbusDevTuple[m_deviceIdx].address;
sendBuffer[3] = (uint8_t)(sendBuffer[1] + sendBuffer[2]); sendBuffer[3] = (uint8_t)(sendBuffer[1] + sendBuffer[2]);
sendBuffer[4] = 0x16; sendBuffer[4] = 0x16;
m_meterBusMaster->sendBufferReady(5, this); m_meterBusMaster->sendBufferReady(5, m_mbusDevTuple[m_deviceIdx].token, this);
break;
} else {
Serial << "no send buffer ready" << endl;
} }
m_mbusDevTuple[i].timer -= 1; }
m_deviceIdx++;
}
if (m_deviceIdx >= NUM_OF_DEVICES) {
m_deviceIdx = 0;
} }
} }

View File

@ -17,6 +17,7 @@
#define NUM_OF_DEVICES 10 #define NUM_OF_DEVICES 10
typedef struct { typedef struct {
uint8_t token;
uint8_t address; uint8_t address;
uint16_t queryPeriod; uint16_t queryPeriod;
uint16_t timer; uint16_t timer;
@ -27,8 +28,8 @@ public:
MqttClient(RequestSender *meterBusMaster); MqttClient(RequestSender *meterBusMaster);
void begin(); void begin();
void exec(); void exec();
virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength); virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token);
virtual void sendError(uint8_t code); virtual void sendError(uint8_t code, uint8_t token);
private: private:
EthernetClient m_client; EthernetClient m_client;
RequestSender *m_meterBusMaster; RequestSender *m_meterBusMaster;
@ -36,6 +37,7 @@ private:
uint8_t m_disconnectState; uint8_t m_disconnectState;
uint32_t m_disconnectTime; uint32_t m_disconnectTime;
uint32_t m_uptime; uint32_t m_uptime;
uint8_t m_deviceIdx;
mbusDevTuple_t m_mbusDevTuple[NUM_OF_DEVICES]; mbusDevTuple_t m_mbusDevTuple[NUM_OF_DEVICES];
}; };

View File

@ -6,14 +6,14 @@
class ResponseCallback { class ResponseCallback {
public: public:
virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength) =0; virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token) =0;
virtual void sendError(uint8_t code) = 0; virtual void sendError(uint8_t code, uint8_t token) = 0;
}; };
class RequestSender { class RequestSender {
public: public:
virtual uint8_t *getSendBuffer() =0; virtual uint8_t *getSendBuffer() =0;
virtual void sendBufferReady(uint16_t bufLen, ResponseCallback *responseCallback) =0; virtual void sendBufferReady(uint16_t bufLen, uint8_t token, ResponseCallback *responseCallback) =0;
}; };

View File

@ -99,12 +99,12 @@ String SendOctets::exec(String params) {
if (err) { if (err) {
return "error"; return "error";
} else { } else {
m_meterBusMaster->sendBufferReady(sendBufLen, this); m_meterBusMaster->sendBufferReady(sendBufLen, 1, this);
return "success"; return "success";
} }
} }
void SendOctets::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength) { void SendOctets::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token) {
Print *out = m_server; Print *out = m_server;
out->print("SO RESP: "); out->print("SO RESP: ");
uint16_t i = 0; uint16_t i = 0;
@ -122,7 +122,7 @@ void SendOctets::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLe
out->println(""); out->println("");
} }
void SendOctets::sendError(uint8_t code) { void SendOctets::sendError(uint8_t code, uint8_t token) {
switch (code) { switch (code) {
case 1: case 1:
m_server->println("SO RESP: no resp."); m_server->println("SO RESP: no resp.");
@ -139,8 +139,8 @@ void SendOctets::sendError(uint8_t code) {
MeterBusMaster::MeterBusMaster() : m_sendOctets(this), m_measureCurrent(this), MeterBusMaster::MeterBusMaster() : m_sendOctets(this), m_measureCurrent(this),
m_calibrationSupport(this), m_calibrationSupport(this),
m_cmdReadyToSend(false), m_cmdReadyFromRecv(false), m_expectResponse(false), m_cmdReadyToSend(false), m_cmdReadyFromRecv(false), m_expectResponse(false),
m_sendBufLen(0), m_recvBufLen(0), m_retransmitCount(0), m_responseCallback(0), m_sendBufLen(0), m_recvBufLen(0), m_retransmitCount(0), m_token(0),
m_sampling(true), m_calibration(false) { m_responseCallback(0), m_sampling(true), m_calibration(false) {
pinMode(RX_EN_PIN, OUTPUT); pinMode(RX_EN_PIN, OUTPUT);
digitalWrite(RX_EN_PIN, RX_DISABLE); digitalWrite(RX_EN_PIN, RX_DISABLE);
Serial3.begin(2400); Serial3.begin(2400);
@ -160,11 +160,12 @@ uint8_t *MeterBusMaster::getSendBuffer() {
return m_expectResponse ? 0 : m_sendBuffer; return m_expectResponse ? 0 : m_sendBuffer;
} }
void MeterBusMaster::sendBufferReady(uint16_t sendBufLen, ResponseCallback *responseCallback) { void MeterBusMaster::sendBufferReady(uint16_t sendBufLen, uint8_t token, ResponseCallback *responseCallback) {
m_cmdReadyToSend = true; m_cmdReadyToSend = true;
m_retransmitCount = 0; m_retransmitCount = 0;
m_sendBufLen = sendBufLen; m_sendBufLen = sendBufLen;
m_responseCallback = responseCallback; m_responseCallback = responseCallback;
m_token = token;
} }
void MeterBusMaster::prepareResponse(bool err, uint8_t in) { void MeterBusMaster::prepareResponse(bool err, uint8_t in) {
@ -173,7 +174,7 @@ void MeterBusMaster::prepareResponse(bool err, uint8_t in) {
if (err) { if (err) {
if (m_responseCallback != 0) { if (m_responseCallback != 0) {
m_responseCallback->sendError(1); m_responseCallback->sendError(1, m_token);
} }
expectedChars = 0; expectedChars = 0;
state = 0; state = 0;
@ -212,9 +213,10 @@ void MeterBusMaster::prepareResponse(bool err, uint8_t in) {
if (expectedChars == 0) { if (expectedChars == 0) {
if (m_responseCallback != 0) { if (m_responseCallback != 0) {
m_responseCallback->sendResponse(m_recvBuffer, m_recvBufLen); m_responseCallback->sendResponse(m_recvBuffer, m_recvBufLen, m_token);
} }
m_expectResponse = false; m_expectResponse = false;
m_token = 0;
state = 0; state = 0;
m_responseCallback = 0; m_responseCallback = 0;
} }

View File

@ -24,8 +24,8 @@ public:
virtual String getCmdName() { return "SO"; } virtual String getCmdName() { return "SO"; }
virtual String getHelp() { return "Send octets"; } virtual String getHelp() { return "Send octets"; }
virtual String exec(String params); virtual String exec(String params);
virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength); virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token);
virtual void sendError(uint8_t code); virtual void sendError(uint8_t code, uint8_t token);
private: private:
RequestSender *m_meterBusMaster; RequestSender *m_meterBusMaster;
}; };
@ -62,7 +62,7 @@ public:
void begin(CmdServer *cmdServer); void begin(CmdServer *cmdServer);
void exec(); void exec();
uint8_t *getSendBuffer(); uint8_t *getSendBuffer();
void sendBufferReady(uint16_t bufLen, ResponseCallback *responseCallback); void sendBufferReady(uint16_t bufLen, uint8_t token, ResponseCallback *responseCallback);
friend class CalibrationSupport; friend class CalibrationSupport;
private: private:
SendOctets m_sendOctets; SendOctets m_sendOctets;
@ -74,6 +74,7 @@ private:
uint16_t m_sendBufLen; uint16_t m_sendBufLen;
uint16_t m_recvBufLen; uint16_t m_recvBufLen;
uint8_t m_retransmitCount; uint8_t m_retransmitCount;
uint8_t m_token;
ResponseCallback *m_responseCallback; ResponseCallback *m_responseCallback;
uint8_t m_sendBuffer[SEND_BUFFER_SIZE]; uint8_t m_sendBuffer[SEND_BUFFER_SIZE];
uint8_t m_recvBuffer[RECEIVE_BUFFER_SIZE]; uint8_t m_recvBuffer[RECEIVE_BUFFER_SIZE];

View File

@ -7,7 +7,7 @@ MeterBusServer::MeterBusServer(uint16_t port, RequestSender *meterBusMaster) :
m_server(port), m_client(255), m_meterBusMaster(meterBusMaster) { m_server(port), m_client(255), m_meterBusMaster(meterBusMaster) {
} }
void MeterBusServer::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength) { void MeterBusServer::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token) {
m_server.write(responseBuffer, responseBufferLength); m_server.write(responseBuffer, responseBufferLength);
} }
@ -31,7 +31,7 @@ void MeterBusServer::exec() {
*(sendBuffer + sendBufLen) = ch; *(sendBuffer + sendBufLen) = ch;
sendBufLen++; sendBufLen++;
} }
m_meterBusMaster->sendBufferReady(sendBufLen, this); m_meterBusMaster->sendBufferReady(sendBufLen, 1, this);
} }
} }
} }

View File

@ -18,7 +18,7 @@ public:
MeterBusServer(uint16_t port, RequestSender *meterBusMaster); MeterBusServer(uint16_t port, RequestSender *meterBusMaster);
void begin(); void begin();
void exec(); void exec();
virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength); virtual void sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token);
virtual void sendError(uint8_t code); virtual void sendError(uint8_t code);
private: private:
EthernetServer m_server; EthernetServer m_server;