/* * MqttClient.cpp * * Created on: 08.05.2015 * Author: wn */ #include "MqttClient.h" #include #include #include #include #include #include #include "cmd.h" #include "config.h" #include "reset.h" char WATCHDOG_TOPIC[] = "IoT/Watchdog"; char MQTT_BROKER_DEFAULT[] = "192.168.75.1"; const uint16_t MQTT_PORT = 1883; uint32_t lastWatchdogMessageReceived = 0; const uint32_t WATCHDOG_MESSAGE_DELAY = 10000; const uint32_t WATCHDOG_MESSAGE_TIMEOUT = 60000; String MqttConfig::exec(String params) { String res = "failed"; Print *out = m_server; MqttClient *mc = (MqttClient*)m_mqttClient; if (params.equalsIgnoreCase("help")) { *out << "help ..... this help page" << endl; *out << "disable .. disable query timer" << endl; *out << "enable ... enable query timer" << endl; *out << "show ..... shows the whole configuration" << endl; *out << "broker ... set the broker's address (this needs to be an IP address," << endl; *out << " a hostname won't work!! DNS seems to be broken" << endl; *out << " followed by a restart, you need to wait" << endl; *out << " about a minute" << endl; *out << "add ...... add an mbus client, params: token name address period" << endl; *out << "del ...... delete an mbus client, params: index" << endl; *out << "reset .... reset configuration" << endl; *out << " followed by a restart, you need to" << endl; *out << " about a minute" << endl; *out << endl; res = "done"; } else if (params.equalsIgnoreCase("enable")) { mc->m_enabled = true; *out << "Query timers enabled" << endl; } else if (params.equalsIgnoreCase("disable")) { mc->m_enabled = false; *out << "Query timers disabled" << endl; } else if (params.equalsIgnoreCase("show")) { // show the whole configuration *out << "Clients:" << endl; *out << "(index, token, address, period)" << endl; for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { *out << i << ": " << mc->m_mbusDevTuple[i].token << ", " << mc->m_mbusDevTuple[i].address << ", " << mc->m_mbusDevTuple[i].queryPeriod << ", " << mc->m_mbusDevTuple[i].name << endl; } *out << "Broker: " << mc->m_mqttBroker << endl; *out << "Watchdog resets: " << WDOG_RSTCNT << endl; res = "done"; } else if (params.startsWith("broker ")) { // set the broker's address int space = params.indexOf(' '); if (space == -1) { res = "missing argument"; } else { String broker = params.substring(space+1); strcpy(mc->m_mqttBroker, broker.c_str()); configWrite(CONFIG_BROKER, sizeof(mc->m_mqttBroker), (char*)mc->m_mqttBroker); *out << "Stopping gateway, wait for reset" << endl; while (true); res = "done"; } } else if (params.startsWith("add ")) { // add an mbus client, params: token address period char paramBuf[64]; char *paramPtr; strncpy(paramBuf, params.c_str(), sizeof(paramBuf)); paramPtr = paramBuf; char *tokenStr = 0; char *nameStr = 0; char *addressStr = 0; char *periodStr = 0; if ((paramPtr != 0) && (*paramPtr != 0)){ // command strsep(¶mPtr, " "); } if ((paramPtr != 0) && (*paramPtr != 0)){ tokenStr = strsep(¶mPtr, " "); } if ((paramPtr != 0) && (*paramPtr != 0)){ nameStr = strsep(¶mPtr, " "); } if ((paramPtr != 0) && (*paramPtr != 0)){ addressStr = strsep(¶mPtr, " "); } if ((paramPtr != 0) && (*paramPtr != 0)){ periodStr = strsep(¶mPtr, " "); } if ((tokenStr != 0) && (*tokenStr != 0) && (addressStr != 0) && (*addressStr != 0) && (periodStr != 0) && (*periodStr != 0) && (nameStr != 0) && (*nameStr != 0)) { int token = atoi(tokenStr); bool validToken = ! ((token <= 0) || (token > 250)); bool validName = strlen(nameStr) < MAX_LEN_OF_NAME; int address = atoi(addressStr); bool validAddress = ! ((address <= 0) || (address > 250)); int period = atoi(periodStr); bool validPeriod = ! ((period < 10) || (period > 3600)); if (validToken && validAddress && validPeriod && validName) { uint8_t i = 0; while (mc->m_mbusDevTuple[i].address != 0) { i++; } mc->m_mbusDevTuple[i].token = (uint8_t)token; mc->m_mbusDevTuple[i].address = (uint8_t)address; mc->m_mbusDevTuple[i].queryPeriod = (uint16_t)period; strcpy(mc->m_mbusDevTuple[i].name, nameStr); configWrite(CONFIG_DEVICES, sizeof(mc->m_mbusDevTuple), (char*)mc->m_mbusDevTuple); res = "done"; } else { if (! validToken) { *out << "Invalid token " << tokenStr << endl; } if (! validAddress) { *out << "Invalid address " << addressStr << endl; } if (! validPeriod) { *out << "Invalid period " << periodStr << endl; } if (! validName) { *out << "Invalid name " << nameStr << endl; } res = "failure"; } } else { *out << "not enough arguments" << endl; res = "failure"; } } else if (params.startsWith("del ")) { // delete an mbus client, params: index int space = params.indexOf(' '); if (space == -1) { res = "missing argument"; } else { String arg0 = params.substring(space); int idx = atoi(arg0.c_str()); if (idx < 0 || idx >= NUM_OF_DEVICES) { res = "illegal index"; } else { *out << "Index " << arg0 << " (" << idx << ") to be deleted" << endl; mc->m_mbusDevTuple[idx].token = 0; strcpy(mc->m_mbusDevTuple[idx].name, "-"); mc->m_mbusDevTuple[idx].address = 0; mc->m_mbusDevTuple[idx].queryPeriod = 0; mc->m_mbusDevTuple[idx].timer = 0; configWrite(CONFIG_DEVICES, sizeof(mc->m_mbusDevTuple), (char*)mc->m_mbusDevTuple); res = "done"; } } } else if (params.equalsIgnoreCase("reset")) { configReset(); *out << "Stopping gateway, wait for reset" << endl; resetDevice(); res = "done"; } return res; } void callback(char* topic, byte* payload, unsigned int length) { if (0 == strncmp(topic, WATCHDOG_TOPIC, length)) { Serial << "Watchdog message received" << endl; lastWatchdogMessageReceived = millis(); } } Metro secondTick = Metro(1000); MqttClient::MqttClient(RequestSender *meterBusMaster) : m_mqttConfig(this), m_client(), m_meterBusMaster(meterBusMaster), m_mqttClient(m_client), m_disconnectState(3), m_disconnectTime(millis()), m_uptime(0), m_deviceIdx(0), m_enabled(true), m_errorCount(0) { } void MqttClient::sendResponse(uint8_t *responseBuffer, uint16_t responseBufferLength, uint8_t token, char *name) { char strbuf[1024]; memset(strbuf, sizeof(strbuf), 0); PString buf = PString(strbuf, sizeof(strbuf)); buf << "{ \"metadata\": { \"device\": \"MeterbusHub\", " << "\"token\": " << token << ", " << "\"name\": \"" << String(name) << "\"" "}, " << "\"data\": {" << "\"uptime\": " << m_uptime << ", " << "\"telegram\": \""; uint16_t i = 0; while (true) { if (responseBuffer[i] <= 0x0f) { buf.print("0"); } buf.print(responseBuffer[i], HEX); buf.print(" "); i++; if (i == responseBufferLength) { break; } } buf << "\"}}"; if (m_disconnectState == 0) { //Serial << "publishing " << strbuf << endl; //Serial << "length: " << buf.length() << endl; m_mqttClient.publish("IoT/Measurement/MeterbusHub", strbuf); } else { Serial << "no MQTT connection, message lost: " << endl << strbuf << endl; } } void MqttClient::sendError(uint8_t code, uint16_t errorCount, uint16_t loopDisabledCount, uint8_t token, char *name) { String msg = String("{ \"metadata\": { \"device\": \"MeterbusHub\", \"error\": ") + code + String(", \"errCnt\": ") + errorCount + String(", \"loopDsbld\": ") + loopDisabledCount + String(", \"token\": ") + token + String(", \"name\": ") + String(name) + String(" }, \"data\": { \"uptime\": ") + m_uptime + String("}}"); m_errorCount = errorCount; m_loopDisabledCount = loopDisabledCount; if (m_disconnectState == 0) { //Serial << "publishing " << msg << endl; //Serial << "length: " << msg.length() << endl; m_mqttClient.publish("IoT/Failure/MeterbusHub", (char*)msg.c_str()); } else { Serial << "no MQTT connection, message lost: " << msg << endl; } } void MqttClient::begin(CmdServer *cmdServer) { m_mqttConfig.registerYourself(cmdServer); if (! configIsValid()) { for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { m_mbusDevTuple[i].token = 0; strcpy(m_mbusDevTuple[i].name, "-"); m_mbusDevTuple[i].address = 0; m_mbusDevTuple[i].queryPeriod = 0; m_mbusDevTuple[i].timer = 0; } configWrite(CONFIG_DEVICES, sizeof(m_mbusDevTuple), (char*)m_mbusDevTuple); strcpy(m_mqttBroker, MQTT_BROKER_DEFAULT); configWrite(CONFIG_BROKER, sizeof(m_mqttBroker), (char*)m_mqttBroker); } configRead(CONFIG_DEVICES, sizeof(m_mbusDevTuple), (char*)m_mbusDevTuple); configRead(CONFIG_BROKER, sizeof(m_mqttBroker), (char*)m_mqttBroker); m_mqttClient = PubSubClient(m_mqttBroker, MQTT_PORT, callback, m_client); } void MqttClient::exec() { //Serial << "*** a" << endl; if ((m_disconnectState == 0) && (! m_mqttClient.loop())) { m_disconnectState = 1; } //Serial << "*** b, " << m_disconnectState << endl; switch (m_disconnectState) { case 0: // Serial.println("discState 0"); // everything fine break; case 1: Serial.println("MQTT Connection lost"); m_mqttClient.disconnect(); m_disconnectTime = millis(); m_disconnectState = 2; break; case 2: if (m_disconnectTime + 2000 < millis()) { m_disconnectState = 3; } break; case 3: Serial << "Trying to re-connect: <" << m_mqttBroker << ">" << endl; if (m_mqttClient.connect("MeterbusHub")) { Serial << "MQTT connected" << endl; m_mqttClient.subscribe(WATCHDOG_TOPIC); m_disconnectTime = millis(); m_disconnectState = 0; } else { Serial << "no success" << endl; m_disconnectState = 1; } break; default: m_disconnectState = 0; break; } //Serial << "*** c" << endl; if (secondTick.check() == 1) { m_uptime++; //Serial << "Tick " << m_uptime << endl; uint32_t now = millis(); if (lastWatchdogMessageReceived + WATCHDOG_MESSAGE_DELAY < now) { Serial << "Alarm due to missing watchdog message, " << endl; } if (lastWatchdogMessageReceived + WATCHDOG_MESSAGE_TIMEOUT < now) { Serial << "Resetting device because of too long missing watchdog message" << endl; resetDevice(); } byte wdogCnt = WDOG_RSTCNT; String msg = String("{ \"metadata\": { \"device\": \"MeterbusHub\" }, \"data\": { \"uptime\": ") + m_uptime + String(", \"errCnt\": ") + m_errorCount + String(", \"loopDsbld\": ") + m_loopDisabledCount + String(", \"watchdogCnt\": ") + wdogCnt + String("}}"); if (m_disconnectState == 0) { m_mqttClient.publish("IoT/Heartbeat/MeterbusHub", (char*)msg.c_str()); } else { Serial << "no MQTT connection, message lost: " << msg << endl; } if (m_enabled) { for (uint8_t i = 0; i < NUM_OF_DEVICES; i++) { if ((m_mbusDevTuple[i].address != 0) && (m_mbusDevTuple[i].timer != 0)) { m_mbusDevTuple[i].timer -= 1; // if (m_mbusDevTuple[i].timer == 0) { // Serial << "Device " << m_mbusDevTuple[i].token << " ready for request" << endl; // } else { // Serial << "Device " << m_mbusDevTuple[i].token << " not ready for request, timer: " << m_mbusDevTuple[i].timer << endl; // } } } } else { Serial << "Query timers disabled" << endl; } //Serial << "while in" << endl; while (true) { if ((m_mbusDevTuple[m_deviceIdx].address != 0) && (m_mbusDevTuple[m_deviceIdx].timer == 0)) { Serial << "Issue request for device " << m_mbusDevTuple[m_deviceIdx].token << endl; uint8_t *sendBuffer = m_meterBusMaster->getSendBuffer(); if (sendBuffer != 0) { Serial << "send buffer ready" << endl; m_mbusDevTuple[m_deviceIdx].timer = m_mbusDevTuple[m_deviceIdx].queryPeriod; sendBuffer[0] = 0x10; sendBuffer[1] = 0x5b; sendBuffer[2] = m_mbusDevTuple[m_deviceIdx].address; sendBuffer[3] = (uint8_t)(sendBuffer[1] + sendBuffer[2]); sendBuffer[4] = 0x16; m_meterBusMaster->sendBufferReady(5, m_mbusDevTuple[m_deviceIdx].token, m_mbusDevTuple[m_deviceIdx].name, this); } else { Serial << "no send buffer ready" << endl; } break; } else { //Serial << "Trying " << m_deviceIdx << ", " << m_mbusDevTuple[m_deviceIdx].token << endl; m_deviceIdx++; if (m_deviceIdx >= NUM_OF_DEVICES) { m_deviceIdx = 0; } break; } } //Serial << "while out" << endl; } //Serial << "*** d" << endl; }