From ca1dea597c750528640fa4c90ef2568cca1e1ab1 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Wed, 9 Aug 2017 00:03:57 +0200 Subject: [PATCH] callchains implemented --- dist/callchain.js | 50 +++++++++++++++++++++++++++++++++++ dist/main.js | 33 ++++++++++++++++++++--- dist/mqttclient.js | 45 ++++++++++++++------------------ npm-debug.log | 45 ++++++++++++++++++++++++++++++++ src/callchain.ts | 63 ++++++++++++++++++++++++++++++++++++++++++++ src/main.ts | 44 ++++++++++++++++++++++++++++--- src/mqttclient.ts | 65 +++++++++++++++++++++------------------------- 7 files changed, 278 insertions(+), 67 deletions(-) create mode 100644 dist/callchain.js create mode 100644 npm-debug.log create mode 100644 src/callchain.ts diff --git a/dist/callchain.js b/dist/callchain.js new file mode 100644 index 0000000..9fa5bf7 --- /dev/null +++ b/dist/callchain.js @@ -0,0 +1,50 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const log = require("./log"); +const events = require("events"); +class AChainItem extends events.EventEmitter { + constructor(label) { + super(); + this._label = label; + this._next = null; + } + toString() { + return `<${this._label}`; + } + registerNext(next) { + this._next = next; + } + send(message) { + this.emit('yourturn', message); + } +} +exports.AChainItem = AChainItem; +class ChainItem extends AChainItem { + constructor(label) { + super(label); + } + toString() { + let funcName = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name; + return `<${funcName}, ${this._label}>`; + } + registerFunc(func) { + this._chainItemFunc = func; + } + begin() { + if (this._next != null) { + this._next.begin(); + } + this.addListener('yourturn', (message) => { + log.info(`Calling ${this.toString()}`); + let result = this._chainItemFunc(message); + if (this._next == null) { + log.info(`Last chain item, final result ${result}`); + } + else { + this._next.send(result); + } + }); + } +} +exports.ChainItem = ChainItem; +//# sourceMappingURL=callchain.js.map \ No newline at end of file diff --git a/dist/main.js b/dist/main.js index edaad59..516798f 100644 --- a/dist/main.js +++ b/dist/main.js @@ -5,20 +5,47 @@ const mqtt = require("./mqttclient"); class Dispatcher { constructor() { this._mqttClient = new mqtt.MqttClient(); - this._mqttClient.register('IoT/test', 'print', (message) => { + this._mqttClient.registerCallbackFunc('IoT/test', 'print1', (message) => { log.info("Callback for IoT/test"); log.info(`message is ${message}`); return `<<${message}>>`; }); - this._mqttClient.register('IoT/Device/#', 'null1', mqtt.passThrough); - this._mqttClient.register('IoT/Device/#', 'null2', mqtt.passThrough); + this._mqttClient.registerCallbackFunc('IoT/test', 'print2', (message) => { + log.info("Callback for IoT/test"); + log.info(`message is ${message}`); + return `<<${message}>>`; + }); + this._mqttClient.registerCallbackFunc('IoT/test', 'null1', mqtt.passThrough); + this._mqttClient.registerCallbackFunc('IoT/test', 'null2', mqtt.passThrough); } exec() { log.info("Dispatcher starting"); this._mqttClient.exec(); log.info("Dispatcher running"); } + test() { + log.info("Sending test data"); + this._mqttClient.test(); + } } +// callchain.registerChainItemFunc('first', (message : any) : any => { +// log.info(`first callback ${message}`) +// return `<${message}>` +// }) +// callchain.registerChainItemFunc('second', (message : any) : any => { +// log.info(`second callback ${message}`) +// return `<${message}>` +// }) +// callchain.registerChainItemFunc('third', (message : any) : any => { +// log.info(`third callback ${message}`) +// return `<${message}>` +// }) +// callchain.begin() +// callchain.send('test1') +// callchain.send('test2') +// callchain.send('test3') +// callchain.send('test4') const dispatcher = new Dispatcher(); dispatcher.exec(); +dispatcher.test(); //# sourceMappingURL=main.js.map \ No newline at end of file diff --git a/dist/mqttclient.js b/dist/mqttclient.js index 6d3159b..911a30e 100644 --- a/dist/mqttclient.js +++ b/dist/mqttclient.js @@ -2,20 +2,8 @@ Object.defineProperty(exports, "__esModule", { value: true }); const Mqtt = require("mqtt"); const log = require("./log"); +const callchain = require("./callchain"); const MQTT_BROKER_DEFAULT_URL = "mqtt://localhost"; -class TopicHandlerCallback { - constructor(label, func) { - this._label = label; - this._func = func; - } - get func() { - return this._func; - } - toString() { - let funcName = (this._func.name === "") ? "lambda" : this._func.name; - return `<${funcName}, ${this._label}>`; - } -} function passThrough(message) { return message; } @@ -25,22 +13,33 @@ class MqttClient { this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL; this._topicHandlers = []; } - register(topic, label, callbackFunc) { + registerCallbackFunc(topic, label, callbackFunc) { + let newChainItem = new callchain.ChainItem(label); + newChainItem.registerFunc(callbackFunc); + this.registerCallbackClass(topic, label, newChainItem); + } + registerCallbackClass(topic, label, newChainItem) { let done = false; - let callback = new TopicHandlerCallback(label, callbackFunc); for (let topicHandler of this._topicHandlers) { if (topicHandler.topic === topic) { - topicHandler.callbacks.push(callback); + topicHandler.last.registerNext(newChainItem); + topicHandler.last = newChainItem; done = true; - log.info(`additional callback ${callback.toString()} added for topic ${topic}`); + log.info(`additional callback ${newChainItem.toString()} added for topic ${topic}`); } } if (!done) { - this._topicHandlers.push({ topic: topic, callbacks: [callback] }); - log.info(`first callback ${callback.toString()} added for topic ${topic}`); + this._topicHandlers.push({ topic: topic, root: newChainItem, last: newChainItem }); + log.info(`first callback ${newChainItem.toString()} added for topic ${topic}`); } } + test() { + this._mqttClient.emit("message", 'IoT/test', 'payload'); + } exec() { + for (let topicHandler of this._topicHandlers) { + topicHandler.root.begin(); + } this._mqttClient = Mqtt.connect(this._mqttBrokerUrl); this._mqttClient.on('error', log.error); this._mqttClient.on('connect', () => { @@ -54,13 +53,7 @@ class MqttClient { for (let topicHandler of this._topicHandlers) { if (this.topicMatch(topicHandler.topic, topic)) { log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); - // topicHandler.callback(payload) - let returnValue = payload; - for (let topicHandlerCallback of topicHandler.callbacks) { - log.info(`Calling ${topicHandlerCallback}`); - returnValue = topicHandlerCallback.func(returnValue); - } - log.info(`Final return value is ${returnValue}`); + topicHandler.root.send(payload); } } }); diff --git a/npm-debug.log b/npm-debug.log new file mode 100644 index 0000000..65ca798 --- /dev/null +++ b/npm-debug.log @@ -0,0 +1,45 @@ +0 info it worked if it ends with ok +1 verbose cli [ '/usr/bin/nodejs', '/usr/bin/npm', 'start' ] +2 info using npm@3.10.10 +3 info using node@v6.11.1 +4 verbose run-script [ 'prestart', 'start', 'poststart' ] +5 info lifecycle dispatcher@1.0.0~prestart: dispatcher@1.0.0 +6 silly lifecycle dispatcher@1.0.0~prestart: no script for prestart, continuing +7 info lifecycle dispatcher@1.0.0~start: dispatcher@1.0.0 +8 verbose lifecycle dispatcher@1.0.0~start: unsafe-perm in lifecycle true +9 verbose lifecycle dispatcher@1.0.0~start: PATH: /usr/lib/node_modules/npm/bin/node-gyp-bin:/home/wn/workspace-node/Dispatcher/node_modules/.bin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/wn/.local/bin:/home/wn/bin:/opt/jdk/bin:/opt/apache-maven/bin:/home/wn/mos/bin +10 verbose lifecycle dispatcher@1.0.0~start: CWD: /home/wn/workspace-node/Dispatcher +11 silly lifecycle dispatcher@1.0.0~start: Args: [ '-c', 'node dist/main.js' ] +12 silly lifecycle dispatcher@1.0.0~start: Returned: code: 1 signal: null +13 info lifecycle dispatcher@1.0.0~start: Failed to exec start script +14 verbose stack Error: dispatcher@1.0.0 start: `node dist/main.js` +14 verbose stack Exit status 1 +14 verbose stack at EventEmitter. (/usr/lib/node_modules/npm/lib/utils/lifecycle.js:255:16) +14 verbose stack at emitTwo (events.js:106:13) +14 verbose stack at EventEmitter.emit (events.js:191:7) +14 verbose stack at ChildProcess. (/usr/lib/node_modules/npm/lib/utils/spawn.js:40:14) +14 verbose stack at emitTwo (events.js:106:13) +14 verbose stack at ChildProcess.emit (events.js:191:7) +14 verbose stack at maybeClose (internal/child_process.js:891:16) +14 verbose stack at Process.ChildProcess._handle.onexit (internal/child_process.js:226:5) +15 verbose pkgid dispatcher@1.0.0 +16 verbose cwd /home/wn/workspace-node/Dispatcher +17 error Linux 4.9.0-3-amd64 +18 error argv "/usr/bin/nodejs" "/usr/bin/npm" "start" +19 error node v6.11.1 +20 error npm v3.10.10 +21 error code ELIFECYCLE +22 error dispatcher@1.0.0 start: `node dist/main.js` +22 error Exit status 1 +23 error Failed at the dispatcher@1.0.0 start script 'node dist/main.js'. +23 error Make sure you have the latest version of node.js and npm installed. +23 error If you do, this is most likely a problem with the dispatcher package, +23 error not with npm itself. +23 error Tell the author that this fails on your system: +23 error node dist/main.js +23 error You can get information on how to open an issue for this project with: +23 error npm bugs dispatcher +23 error Or if that isn't available, you can get their info via: +23 error npm owner ls dispatcher +23 error There is likely additional logging output above. +24 verbose exit [ 1, true ] diff --git a/src/callchain.ts b/src/callchain.ts new file mode 100644 index 0000000..a723f6b --- /dev/null +++ b/src/callchain.ts @@ -0,0 +1,63 @@ +import * as log from './log' +import * as events from 'events' + +export type ChainItemFunc = (message: any) => any + +export abstract class AChainItem extends events.EventEmitter { + protected _label : string + protected _next : AChainItem | null + + + constructor(label : string) { + super() + this._label = label + this._next = null + } + + public toString() : string { + return `<${this._label}` + } + + registerNext(next : AChainItem) :void { + this._next = next + } + + send(message : any) : void { + this.emit('yourturn', message) + } + + abstract begin() : void +} + +export class ChainItem extends AChainItem { + private _chainItemFunc : ChainItemFunc + + constructor(label : string) { + super(label) + } + + public toString() : string { + let funcName : string = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name + return `<${funcName}, ${this._label}>` + } + + registerFunc(func : ChainItemFunc) : void { + this._chainItemFunc = func + } + + begin() :void { + if (this._next != null) { + this._next.begin() + } + this.addListener('yourturn', (message : any) : void => { + log.info(`Calling ${this.toString()}`) + let result : any = this._chainItemFunc(message) + if (this._next == null) { + log.info(`Last chain item, final result ${result}`) + } else { + this._next.send(result) + } + }) + } +} + diff --git a/src/main.ts b/src/main.ts index f8fdfab..2f8a1eb 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,5 +1,7 @@ import * as log from './log' import * as mqtt from './mqttclient' +import * as callchain from './callchain' + class Dispatcher { private _mqttClient: mqtt.MqttClient @@ -7,13 +9,18 @@ class Dispatcher { constructor() { this._mqttClient = new mqtt.MqttClient() - this._mqttClient.register('IoT/test', 'print', (message: any) : any => { + this._mqttClient.registerCallbackFunc('IoT/test', 'print1', (message: any) : any => { log.info("Callback for IoT/test") log.info(`message is ${message}`) return `<<${message}>>` }) - this._mqttClient.register('IoT/Device/#', 'null1', mqtt.passThrough) - this._mqttClient.register('IoT/Device/#', 'null2', mqtt.passThrough) + this._mqttClient.registerCallbackFunc('IoT/test', 'print2', (message: any) : any => { + log.info("Callback for IoT/test") + log.info(`message is ${message}`) + return `<<${message}>>` + }) + this._mqttClient.registerCallbackFunc('IoT/test', 'null1', mqtt.passThrough) + this._mqttClient.registerCallbackFunc('IoT/test', 'null2', mqtt.passThrough) } exec() : void { @@ -23,12 +30,43 @@ class Dispatcher { log.info("Dispatcher running") } + + test() : void { + log.info("Sending test data") + this._mqttClient.test() + } } +// callchain.registerChainItemFunc('first', (message : any) : any => { +// log.info(`first callback ${message}`) +// return `<${message}>` +// }) + +// callchain.registerChainItemFunc('second', (message : any) : any => { +// log.info(`second callback ${message}`) +// return `<${message}>` +// }) + +// callchain.registerChainItemFunc('third', (message : any) : any => { +// log.info(`third callback ${message}`) +// return `<${message}>` +// }) + +// callchain.begin() + +// callchain.send('test1') +// callchain.send('test2') +// callchain.send('test3') +// callchain.send('test4') + const dispatcher = new Dispatcher() dispatcher.exec() +dispatcher.test() + + + diff --git a/src/mqttclient.ts b/src/mqttclient.ts index ca7602a..3c7d5b7 100644 --- a/src/mqttclient.ts +++ b/src/mqttclient.ts @@ -1,33 +1,20 @@ import * as Mqtt from 'mqtt' import * as log from './log' +import * as callchain from './callchain' + const MQTT_BROKER_DEFAULT_URL : string = "mqtt://localhost" -type TopicHandlerCallbackFunc = (message: any) => any - -class TopicHandlerCallback { - private _label: string - private _func: TopicHandlerCallbackFunc - constructor(label: string, func: TopicHandlerCallbackFunc) { - this._label = label - this._func = func - } - get func() : TopicHandlerCallbackFunc { - return this._func - } - public toString() : string { - let funcName : string = (this._func.name === "") ? "lambda" : this._func.name - return `<${funcName}, ${this._label}>` - } +export function passThrough(message: any) { + return message } + + interface TopicHandler { topic: string, - callbacks: TopicHandlerCallback[] -} - -export function passThrough(message: any) { - return message + root: callchain.AChainItem | undefined + last: callchain.AChainItem | undefined } export class MqttClient { @@ -40,23 +27,37 @@ export class MqttClient { this._topicHandlers = [] } - register(topic: string, label: string, callbackFunc: TopicHandlerCallbackFunc) : void { + registerCallbackFunc(topic: string, label: string, callbackFunc: callchain.ChainItemFunc) : void { + let newChainItem = new callchain.ChainItem(label) + newChainItem.registerFunc(callbackFunc) + this.registerCallbackClass(topic, label, newChainItem) + } + + registerCallbackClass(topic: string, label: string, newChainItem: callchain.AChainItem) : void { let done: boolean = false - let callback : TopicHandlerCallback = new TopicHandlerCallback(label, callbackFunc) for (let topicHandler of this._topicHandlers) { if (topicHandler.topic === topic) { - topicHandler.callbacks.push(callback) + (topicHandler.last as callchain.AChainItem).registerNext(newChainItem) + topicHandler.last = newChainItem done = true - log.info(`additional callback ${callback.toString()} added for topic ${topic}`) + log.info(`additional callback ${newChainItem.toString()} added for topic ${topic}`) } } if (! done) { - this._topicHandlers.push({topic: topic, callbacks:[ callback ]}) - log.info(`first callback ${callback.toString()} added for topic ${topic}`) + this._topicHandlers.push({topic: topic, root: newChainItem, last: newChainItem}) + log.info(`first callback ${newChainItem.toString()} added for topic ${topic}`) } } + test() : void { + this._mqttClient.emit("message", 'IoT/test', 'payload') + } + exec() : void { + for (let topicHandler of this._topicHandlers) { + (topicHandler.root as callchain.ChainItem).begin() + } + this._mqttClient = Mqtt.connect(this._mqttBrokerUrl) this._mqttClient.on('error', log.error) this._mqttClient.on('connect', (): void => { @@ -69,14 +70,8 @@ export class MqttClient { log.info(`message received, topic ${topic}, payload ${payload}`) for (let topicHandler of this._topicHandlers) { if (this.topicMatch(topicHandler.topic, topic)) { - log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`) - // topicHandler.callback(payload) - let returnValue: any = payload - for (let topicHandlerCallback of topicHandler.callbacks) { - log.info(`Calling ${topicHandlerCallback}`) - returnValue = topicHandlerCallback.func(returnValue) - } - log.info(`Final return value is ${returnValue}`) + log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); + (topicHandler.root as callchain.ChainItem).send(payload) } } })