let logger = require('./log') logger.info('mqttHandler executed') var mqtt = require('mqtt'); var client = undefined; var topicCallbacks = {}; function start() { client = mqtt.connect('mqtt://172.16.2.16'); client.on('error', (err) => { logger.error(`Error in mqttHandler: ${err}`) }); client.on('connect', () => { client.publish('dispatcher_ng/status', 'dispatcher_ng running'); client.subscribe('dispatcher_ng/cmd'); Object.keys(topicCallbacks).forEach((topic) => { client.subscribe(topic); logger.info(`${topic} subscribed`); }); logger.info('mqtt connection established'); }); client.on('message', (topic, payload) => { payload = payload.toString('UTF-8'); logger.info(`message received on topic ${topic}: ${payload}`); if (topic in topicCallbacks) { topicCallbacks[topic].forEach((cb) => { cb(topic, payload) }); } }); } function send(topic, payload) { client.publish(topic, payload); } function register(topics, cb) { if (! (topics instanceof Array)) { topics = [ topics ]; } topics.forEach((topic) => { if (topic in topicCallbacks) { topicCallbacks[topic].push(cb); logger.info(`additional callback registered for ${topic}`); } else { topicCallbacks[topic] = [ cb ]; logger.info(`first callback registered for ${topic}`); } }) } module.exports = { start, send, register };