95 lines
3.7 KiB
JavaScript
95 lines
3.7 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
const logger = require("./log");
|
|
const Mqtt = require("mqtt");
|
|
const fs = require("fs");
|
|
const config = require("./config");
|
|
class MqttHandler {
|
|
constructor() {
|
|
this.mqttOptions = {};
|
|
this.mqttBrokerUrl = config.dict.brokerUrl;
|
|
this.mqttOptions.clientId = 'dispatcher';
|
|
if (config.dict.brokerUser && config.dict.brokerPass) {
|
|
this.mqttOptions.username = config.dict.brokerUser;
|
|
this.mqttOptions.password = config.dict.brokerPass;
|
|
}
|
|
if (config.dict.brokerCa) {
|
|
this.mqttOptions.ca = fs.readFileSync(config.dict.brokerCa, 'ascii');
|
|
this.mqttOptions.rejectUnauthorized = true;
|
|
}
|
|
this.topicHandlers = [];
|
|
this.startCallbacks = [];
|
|
logger.info("MqttHandler constructed");
|
|
}
|
|
register(topics, cb, startCb) {
|
|
topics.forEach((topic) => {
|
|
this.topicHandlers.push({ topic: topic, callback: cb });
|
|
logger.info(`Callback registered for ${topic}`);
|
|
});
|
|
this.startCallbacks.push(startCb);
|
|
}
|
|
exec() {
|
|
logger.info(`Connecting to ${this.mqttBrokerUrl}`);
|
|
this.mqttClient = Mqtt.connect(this.mqttBrokerUrl, this.mqttOptions);
|
|
this.mqttClient.on('error', (err) => {
|
|
logger.error(`Error in mqttHandler: ${err}`);
|
|
});
|
|
this.mqttClient.on('connect', () => {
|
|
this.mqttClient.publish('dispatcher_ng/status', 'dispatcher_ng running');
|
|
this.mqttClient.subscribe('dispatcher_ng/cmd');
|
|
this.topicHandlers.forEach((topicHandler) => {
|
|
this.mqttClient.subscribe(topicHandler.topic);
|
|
logger.info(`${topicHandler.topic} subscribed`);
|
|
});
|
|
logger.info('MQTT connection established');
|
|
this.startCallbacks.forEach((cb) => {
|
|
cb();
|
|
logger.info("started");
|
|
});
|
|
});
|
|
this.mqttClient.on('message', (topic, payload, packet) => {
|
|
// if (! packet.retain) {
|
|
let payloadStr = payload.toString('UTF-8');
|
|
// logger.info(`Message received on topic ${topic}: ${payload}`)
|
|
this.processMessage(topic, payloadStr);
|
|
// }
|
|
});
|
|
}
|
|
processMessage(topic, payload) {
|
|
let found = false;
|
|
this.topicHandlers.forEach((topicHandler) => {
|
|
// logger.warn(`Test: ${subscribedTopic}, ${topic}`);
|
|
// console.log(`Test: ${subscribedTopic}, ${topic}`);
|
|
if (topicHandler.topic == topic) {
|
|
// logger.warn('1');
|
|
topicHandler.callback(topic, payload);
|
|
found = true;
|
|
}
|
|
else if (topicHandler.topic.endsWith('#') &&
|
|
(topicHandler.topic.substring(0, topicHandler.topic.length - 1) ==
|
|
topic.substring(0, topicHandler.topic.length - 1))) {
|
|
// logger.warn('2');
|
|
// console.log('2');
|
|
topicHandler.callback(topic, payload);
|
|
found = true;
|
|
}
|
|
});
|
|
return found;
|
|
}
|
|
send(topic, payload, internalFirst = false) {
|
|
//let sent = false
|
|
//if (internalFirst) {
|
|
// logger.info(`Try internal sending: ${topic}`)
|
|
// sent = this.processMessage(topic, payload)
|
|
//}
|
|
//if (! sent) {
|
|
logger.info(`External sending required: ${topic} ${payload}`);
|
|
let options = { retain: true, qos: 0 };
|
|
this.mqttClient.publish(topic, payload, options);
|
|
//} else {
|
|
// logger.info(`Internally delivered: ${topic}`)
|
|
//}
|
|
}
|
|
}
|
|
exports.mqttHandler = new MqttHandler();
|
|
//# sourceMappingURL=MqttDispatcher.js.map
|