Files
dispatcher_ng/dist/MqttDispatcher.js
2018-04-06 23:38:08 +02:00

94 lines
3.7 KiB
JavaScript

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const logger = require("./log");
const Mqtt = require("mqtt");
const fs = require("fs");
const config = require("./config");
class MqttHandler {
constructor() {
this.mqttOptions = {};
this.mqttBrokerUrl = config.dict.brokerUrl;
if (config.dict.brokerUser && config.dict.brokerPass) {
this.mqttOptions.username = config.dict.brokerUser;
this.mqttOptions.password = config.dict.brokerPass;
}
if (config.dict.brokerCa) {
this.mqttOptions.ca = fs.readFileSync(config.dict.brokerCa, 'ascii');
this.mqttOptions.rejectUnauthorized = true;
}
this.topicHandlers = [];
this.startCallbacks = [];
logger.info("MqttHandler constructed");
}
register(topics, cb, startCb) {
topics.forEach((topic) => {
this.topicHandlers.push({ topic: topic, callback: cb });
logger.info(`Callback registered for ${topic}`);
});
this.startCallbacks.push(startCb);
}
exec() {
logger.info(`Connecting to ${this.mqttBrokerUrl}`);
this.mqttClient = Mqtt.connect(this.mqttBrokerUrl, this.mqttOptions);
this.mqttClient.on('error', (err) => {
logger.error(`Error in mqttHandler: ${err}`);
});
this.mqttClient.on('connect', () => {
this.mqttClient.publish('dispatcher_ng/status', 'dispatcher_ng running');
this.mqttClient.subscribe('dispatcher_ng/cmd');
this.topicHandlers.forEach((topicHandler) => {
this.mqttClient.subscribe(topicHandler.topic);
logger.info(`${topicHandler.topic} subscribed`);
});
logger.info('MQTT connection established');
this.startCallbacks.forEach((cb) => {
cb();
logger.info("started");
});
});
this.mqttClient.on('message', (topic, payload, packet) => {
if (!packet.retain) {
let payloadStr = payload.toString('UTF-8');
// logger.info(`Message received on topic ${topic}: ${payload}`)
this.processMessage(topic, payloadStr);
}
});
}
processMessage(topic, payload) {
let found = false;
this.topicHandlers.forEach((topicHandler) => {
// logger.warn(`Test: ${subscribedTopic}, ${topic}`);
// console.log(`Test: ${subscribedTopic}, ${topic}`);
if (topicHandler.topic == topic) {
// logger.warn('1');
topicHandler.callback(topic, payload);
found = true;
}
else if (topicHandler.topic.endsWith('#') &&
(topicHandler.topic.substring(0, topicHandler.topic.length - 1) ==
topic.substring(0, topicHandler.topic.length - 1))) {
// logger.warn('2');
// console.log('2');
topicHandler.callback(topic, payload);
found = true;
}
});
return found;
}
send(topic, payload, internalFirst = false) {
//let sent = false
//if (internalFirst) {
// logger.info(`Try internal sending: ${topic}`)
// sent = this.processMessage(topic, payload)
//}
//if (! sent) {
logger.info(`External sending required: ${topic} ${payload}`);
let options = { retain: true, qos: 0 };
this.mqttClient.publish(topic, payload, options);
//} else {
// logger.info(`Internally delivered: ${topic}`)
//}
}
}
exports.mqttHandler = new MqttHandler();
//# sourceMappingURL=MqttDispatcher.js.map