initial
This commit is contained in:
59
src/mqttHandler.js
Normal file
59
src/mqttHandler.js
Normal file
@ -0,0 +1,59 @@
|
||||
let logger = require('./log')
|
||||
|
||||
logger.info('mqttHandler executed')
|
||||
|
||||
|
||||
var mqtt = require('mqtt');
|
||||
|
||||
var client = undefined;
|
||||
var topicCallbacks = {};
|
||||
|
||||
|
||||
function start() {
|
||||
client = mqtt.connect('mqtt://172.16.2.16');
|
||||
|
||||
client.on('error', (err) => {
|
||||
logger.error(`Error in mqttHandler: ${err}`)
|
||||
});
|
||||
client.on('connect', () => {
|
||||
client.publish('dispatcher_ng/status', 'dispatcher_ng running');
|
||||
client.subscribe('dispatcher_ng/cmd');
|
||||
Object.keys(topicCallbacks).forEach((topic) => {
|
||||
client.subscribe(topic);
|
||||
logger.info(`${topic} subscribed`);
|
||||
});
|
||||
logger.info('mqtt connection established');
|
||||
});
|
||||
client.on('message', (topic, payload) => {
|
||||
logger.info(`message received on topic ${topic}: ${payload}`);
|
||||
if (topic in topicCallbacks) {
|
||||
topicCallbacks[topic].forEach((cb) => { cb(topic, payload) });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function send(topic, payload) {
|
||||
client.publish(topic, payload);
|
||||
}
|
||||
|
||||
function register(topics, cb) {
|
||||
if (! (topics instanceof Array)) {
|
||||
topics = [ topics ];
|
||||
}
|
||||
topics.forEach((topic) => {
|
||||
if (topic in topicCallbacks) {
|
||||
topicCallbacks[topic].push(cb);
|
||||
logger.info(`additional callback registered for ${topic}`);
|
||||
} else {
|
||||
topicCallbacks[topic] = [ cb ];
|
||||
logger.info(`first callback registered for ${topic}`);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
start,
|
||||
send,
|
||||
register
|
||||
};
|
||||
|
Reference in New Issue
Block a user