let logger = require('./log') logger.info('mqttHandler executed') var mqtt = require('mqtt'); var client = undefined; var topicCallbacks = {}; function start() { client = mqtt.connect('mqtt://172.16.2.16'); client.on('error', (err) => { logger.error(`Error in mqttHandler: ${err}`) }); client.on('connect', () => { client.publish('dispatcher_ng/status', 'dispatcher_ng running'); client.subscribe('dispatcher_ng/cmd'); Object.keys(topicCallbacks).forEach((topic) => { client.subscribe(topic); logger.info(`${topic} subscribed`); }); logger.info('mqtt connection established'); }); client.on('message', (topic, payload) => { payload = payload.toString('UTF-8'); logger.info(`message received on topic ${topic}: ${payload}`); processMessage(topic, payload); }); } function processMessage(topic, payload) { let found = false; Object.keys(topicCallbacks).forEach((subscribedTopic) => { // logger.warn(`Test: ${subscribedTopic}, ${topic}`); // console.log(`Test: ${subscribedTopic}, ${topic}`); if (subscribedTopic == topic) { // logger.warn('1'); topicCallbacks[topic].forEach((cb) => { cb(topic, payload) }); found = true; } else if (subscribedTopic.endsWith('#') && (subscribedTopic.substring(0, subscribedTopic.length-1) == topic.substring(0, subscribedTopic.length-1))) { // logger.warn('2'); // console.log('2'); topicCallbacks[subscribedTopic].forEach((cb) => { cb(topic, payload) }); found = false; } }); return found; } function send(topic, payload, internalFirst = false) { let sent = false; if (internalFirst) { sent = processMessage(topic, payload); } if (! sent) { client.publish(topic, payload); } } function register(topics, cb) { if (! (topics instanceof Array)) { topics = [ topics ]; } topics.forEach((topic) => { if (topic in topicCallbacks) { topicCallbacks[topic].push(cb); logger.info(`additional callback registered for ${topic}`); } else { topicCallbacks[topic] = [ cb ]; logger.info(`first callback registered for ${topic}`); } }) } module.exports = { start, send, register };