diff --git a/src/mqttHandler.js b/src/mqttHandler.js index 094d20b..8b4c218 100644 --- a/src/mqttHandler.js +++ b/src/mqttHandler.js @@ -27,28 +27,39 @@ function start() { client.on('message', (topic, payload) => { payload = payload.toString('UTF-8'); logger.info(`message received on topic ${topic}: ${payload}`); - //if (topic in topicCallbacks) { - // topicCallbacks[topic].forEach((cb) => { cb(topic, payload) }); - //} - Object.keys(topicCallbacks).forEach((subscribedTopic) => { - // logger.warn(`Test: ${subscribedTopic}, ${topic}`); - // console.log(`Test: ${subscribedTopic}, ${topic}`); - if (subscribedTopic == topic) { - // logger.warn('1'); - topicCallbacks[topic].forEach((cb) => { cb(topic, payload) }); - } else if (subscribedTopic.endsWith('#') && - (subscribedTopic.substring(0, subscribedTopic.length-1) == - topic.substring(0, subscribedTopic.length-1))) { - // logger.warn('2'); - // console.log('2'); - topicCallbacks[subscribedTopic].forEach((cb) => { cb(topic, payload) }); - } - }); + processMessage(topic, payload); }); } -function send(topic, payload) { - client.publish(topic, payload); +function processMessage(topic, payload) { + let found = false; + Object.keys(topicCallbacks).forEach((subscribedTopic) => { + // logger.warn(`Test: ${subscribedTopic}, ${topic}`); + // console.log(`Test: ${subscribedTopic}, ${topic}`); + if (subscribedTopic == topic) { + // logger.warn('1'); + topicCallbacks[topic].forEach((cb) => { cb(topic, payload) }); + found = true; + } else if (subscribedTopic.endsWith('#') && + (subscribedTopic.substring(0, subscribedTopic.length-1) == + topic.substring(0, subscribedTopic.length-1))) { + // logger.warn('2'); + // console.log('2'); + topicCallbacks[subscribedTopic].forEach((cb) => { cb(topic, payload) }); + found = false; + } + }); + return found; +} + +function send(topic, payload, internalFirst = false) { + let sent = false; + if (internalFirst) { + sent = processMessage(topic, payload); + } + if (! sent) { + client.publish(topic, payload); + } } function register(topics, cb) {