/* * MqttClient.cpp * * Created on: 08.05.2015 * Author: wn */ #include "MqttClient.h" #include #include #include #include byte MQTT_BROKER[] = { 192, 168, 75, 1 }; const uint16_t MQTT_PORT = 1883; void callback(char* topic, byte* payload, unsigned int length) { char strbuf[256]; memset(strbuf, sizeof(strbuf), 0); memcpy(strbuf, payload, length); Serial << "MQTT Message received: " << topic << ", " << strbuf << endl; // handle message arrived } Metro secondTick = Metro(1000); MqttClient::MqttClient(RequestSender *meterBusMaster) : m_client(), m_meterBusMaster(meterBusMaster), m_mqttClient(MQTT_BROKER, MQTT_PORT, callback, m_client), m_disconnectState(3), m_disconnectTime(millis()), m_uptime(0) { for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { m_mbusDevTuple[i] = { 0, 0, 0 }; } m_mbusDevTuple[0] = { 0x53, 60, 0 }; // light meter } void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength) { char strbuf[256]; memset(strbuf, sizeof(strbuf), 0); PString buf = PString(strbuf, sizeof(strbuf)); buf << "{ \"metadata\": { \"device\": \"MeterbusHub\" }, " << "\"data\": {" << "\"uptime\": " << m_uptime << ", " "\"telegram\": \""; uint16_t i = 0; while (true) { if (responseBuffer[i] <= 0x0f) { buf.print("0"); } buf.print(responseBuffer[i], HEX); buf.print(" "); i++; if (i == responseBufferLength) { break; } } buf << "\"}}"; if (m_disconnectState == 0) { m_mqttClient.publish("MeterbusHub.Measurement", strbuf); } else { Serial << "no MQTT connection, message lost: " << endl << strbuf << endl; } // m_server.write(responseBuffer, responseBufferLength); } void MqttClient::sendError(uint8_t code) { } void MqttClient::begin() { } void MqttClient::exec() { if ((m_disconnectState == 0) && (! m_mqttClient.loop())) { m_disconnectState = 1; } switch (m_disconnectState) { case 0: // Serial.println("discState 0"); // everything fine break; case 1: Serial.println("MQTT Connection lost"); m_mqttClient.disconnect(); m_disconnectTime = millis(); m_disconnectState = 2; break; case 2: if (m_disconnectTime + 2000 < millis()) { m_disconnectState = 3; } break; case 3: if (m_mqttClient.connect("MeterbusHub")) { Serial << "MQTT connected" << endl; m_mqttClient.subscribe("MeterbusHub.Configuration"); m_disconnectTime = millis(); m_disconnectState = 0; } else { m_disconnectState = 1; } break; default: m_disconnectState = 0; break; } if (secondTick.check() == 1) { m_uptime++; String msg = String("{ \"metadata\": { \"device\": \"MeterbusHub\" }, \"data\": { \"uptime\": ") + m_uptime + String("}}"); if (m_disconnectState == 0) { m_mqttClient.publish("MeterbusHub.Heartbeat", (char*)msg.c_str()); } else { Serial << "no MQTT connection, message lost: " << msg << endl; } for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { if ((m_mbusDevTuple[i].address != 0) && (m_mbusDevTuple[i].timer == 0)) { m_mbusDevTuple[i].timer = m_mbusDevTuple[i].queryPeriod; Serial << "Issue request for device " << m_mbusDevTuple[i].address << endl; uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer(); sendBuffer[0] = 0x10; sendBuffer[1] = 0x5b; sendBuffer[2] = m_mbusDevTuple[i].address; sendBuffer[3] = (uint8_t)(sendBuffer[1] + sendBuffer[2]); sendBuffer[4] = 0x16; m_meterBusMaster->sendBufferReady(5, this); } m_mbusDevTuple[i].timer -= 1; } } // m_client = m_server.available(); // if (m_client) { // uint16_t sendBufLen = 0; // uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer(); // if (sendBuffer != 0) { // int chi; // while ((chi = m_client.read()) != -1) { // char ch = (char) chi; // *(sendBuffer + sendBufLen) = ch; // sendBufLen++; // } // m_meterBusMaster->sendBufferReady(sendBufLen, this); // } // } }