diff --git a/dist/main.js b/dist/main.js index 77dd7e8..edaad59 100644 --- a/dist/main.js +++ b/dist/main.js @@ -5,13 +5,13 @@ const mqtt = require("./mqttclient"); class Dispatcher { constructor() { this._mqttClient = new mqtt.MqttClient(); - this._mqttClient.register('IoT/test', (message) => { + this._mqttClient.register('IoT/test', 'print', (message) => { log.info("Callback for IoT/test"); log.info(`message is ${message}`); return `<<${message}>>`; }); - this._mqttClient.register('IoT/Device/#', mqtt.passThrough); - this._mqttClient.register('IoT/Device/#', mqtt.passThrough); + this._mqttClient.register('IoT/Device/#', 'null1', mqtt.passThrough); + this._mqttClient.register('IoT/Device/#', 'null2', mqtt.passThrough); } exec() { log.info("Dispatcher starting"); diff --git a/dist/mqttclient.js b/dist/mqttclient.js index 1028fd1..6d3159b 100644 --- a/dist/mqttclient.js +++ b/dist/mqttclient.js @@ -3,6 +3,19 @@ Object.defineProperty(exports, "__esModule", { value: true }); const Mqtt = require("mqtt"); const log = require("./log"); const MQTT_BROKER_DEFAULT_URL = "mqtt://localhost"; +class TopicHandlerCallback { + constructor(label, func) { + this._label = label; + this._func = func; + } + get func() { + return this._func; + } + toString() { + let funcName = (this._func.name === "") ? "lambda" : this._func.name; + return `<${funcName}, ${this._label}>`; + } +} function passThrough(message) { return message; } @@ -12,20 +25,19 @@ class MqttClient { this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL; this._topicHandlers = []; } - register(topic, callback) { + register(topic, label, callbackFunc) { let done = false; - let callbackName = (callback.name === "") ? "lambda" : callback.name; + let callback = new TopicHandlerCallback(label, callbackFunc); for (let topicHandler of this._topicHandlers) { if (topicHandler.topic === topic) { topicHandler.callbacks.push(callback); done = true; - log.info(`additional callback <${callbackName}> added for topic ${topic}`); + log.info(`additional callback ${callback.toString()} added for topic ${topic}`); } } if (!done) { - let cbs = [callback]; - this._topicHandlers.push({ topic: topic, callbacks: cbs }); - log.info(`first callback <${callbackName}> added for topic ${topic}`); + this._topicHandlers.push({ topic: topic, callbacks: [callback] }); + log.info(`first callback ${callback.toString()} added for topic ${topic}`); } } exec() { @@ -43,6 +55,12 @@ class MqttClient { if (this.topicMatch(topicHandler.topic, topic)) { log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); // topicHandler.callback(payload) + let returnValue = payload; + for (let topicHandlerCallback of topicHandler.callbacks) { + log.info(`Calling ${topicHandlerCallback}`); + returnValue = topicHandlerCallback.func(returnValue); + } + log.info(`Final return value is ${returnValue}`); } } }); diff --git a/src/main.ts b/src/main.ts index 6eb5a64..f8fdfab 100644 --- a/src/main.ts +++ b/src/main.ts @@ -7,13 +7,13 @@ class Dispatcher { constructor() { this._mqttClient = new mqtt.MqttClient() - this._mqttClient.register('IoT/test', (message: any) : any => { + this._mqttClient.register('IoT/test', 'print', (message: any) : any => { log.info("Callback for IoT/test") log.info(`message is ${message}`) return `<<${message}>>` }) - this._mqttClient.register('IoT/Device/#', mqtt.passThrough) - this._mqttClient.register('IoT/Device/#', mqtt.passThrough) + this._mqttClient.register('IoT/Device/#', 'null1', mqtt.passThrough) + this._mqttClient.register('IoT/Device/#', 'null2', mqtt.passThrough) } exec() : void { diff --git a/src/mqttclient.ts b/src/mqttclient.ts index 148c78f..ca7602a 100644 --- a/src/mqttclient.ts +++ b/src/mqttclient.ts @@ -3,7 +3,23 @@ import * as log from './log' const MQTT_BROKER_DEFAULT_URL : string = "mqtt://localhost" -type TopicHandlerCallback = (message: any) => any +type TopicHandlerCallbackFunc = (message: any) => any + +class TopicHandlerCallback { + private _label: string + private _func: TopicHandlerCallbackFunc + constructor(label: string, func: TopicHandlerCallbackFunc) { + this._label = label + this._func = func + } + get func() : TopicHandlerCallbackFunc { + return this._func + } + public toString() : string { + let funcName : string = (this._func.name === "") ? "lambda" : this._func.name + return `<${funcName}, ${this._label}>` + } +} interface TopicHandler { topic: string, @@ -24,20 +40,19 @@ export class MqttClient { this._topicHandlers = [] } - register(topic: string, callback: TopicHandlerCallback) : void { + register(topic: string, label: string, callbackFunc: TopicHandlerCallbackFunc) : void { let done: boolean = false - let callbackName : string = (callback.name === "") ? "lambda" : callback.name + let callback : TopicHandlerCallback = new TopicHandlerCallback(label, callbackFunc) for (let topicHandler of this._topicHandlers) { if (topicHandler.topic === topic) { topicHandler.callbacks.push(callback) done = true - log.info(`additional callback <${callbackName}> added for topic ${topic}`) + log.info(`additional callback ${callback.toString()} added for topic ${topic}`) } } if (! done) { - let cbs : TopicHandlerCallback[] = [ callback ] - this._topicHandlers.push({topic:topic, callbacks:cbs}) - log.info(`first callback <${callbackName}> added for topic ${topic}`) + this._topicHandlers.push({topic: topic, callbacks:[ callback ]}) + log.info(`first callback ${callback.toString()} added for topic ${topic}`) } } @@ -56,6 +71,12 @@ export class MqttClient { if (this.topicMatch(topicHandler.topic, topic)) { log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`) // topicHandler.callback(payload) + let returnValue: any = payload + for (let topicHandlerCallback of topicHandler.callbacks) { + log.info(`Calling ${topicHandlerCallback}`) + returnValue = topicHandlerCallback.func(returnValue) + } + log.info(`Final return value is ${returnValue}`) } } })