callchains implemented

This commit is contained in:
Wolfgang Hottgenroth
2017-08-09 00:03:57 +02:00
parent 1aa7765079
commit ca1dea597c
7 changed files with 278 additions and 67 deletions

63
src/callchain.ts Normal file
View File

@ -0,0 +1,63 @@
import * as log from './log'
import * as events from 'events'
export type ChainItemFunc = (message: any) => any
export abstract class AChainItem extends events.EventEmitter {
protected _label : string
protected _next : AChainItem | null
constructor(label : string) {
super()
this._label = label
this._next = null
}
public toString() : string {
return `<${this._label}`
}
registerNext(next : AChainItem) :void {
this._next = next
}
send(message : any) : void {
this.emit('yourturn', message)
}
abstract begin() : void
}
export class ChainItem extends AChainItem {
private _chainItemFunc : ChainItemFunc
constructor(label : string) {
super(label)
}
public toString() : string {
let funcName : string = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name
return `<${funcName}, ${this._label}>`
}
registerFunc(func : ChainItemFunc) : void {
this._chainItemFunc = func
}
begin() :void {
if (this._next != null) {
this._next.begin()
}
this.addListener('yourturn', (message : any) : void => {
log.info(`Calling ${this.toString()}`)
let result : any = this._chainItemFunc(message)
if (this._next == null) {
log.info(`Last chain item, final result ${result}`)
} else {
this._next.send(result)
}
})
}
}

View File

@ -1,5 +1,7 @@
import * as log from './log'
import * as mqtt from './mqttclient'
import * as callchain from './callchain'
class Dispatcher {
private _mqttClient: mqtt.MqttClient
@ -7,13 +9,18 @@ class Dispatcher {
constructor() {
this._mqttClient = new mqtt.MqttClient()
this._mqttClient.register('IoT/test', 'print', (message: any) : any => {
this._mqttClient.registerCallbackFunc('IoT/test', 'print1', (message: any) : any => {
log.info("Callback for IoT/test")
log.info(`message is ${message}`)
return `<<${message}>>`
})
this._mqttClient.register('IoT/Device/#', 'null1', mqtt.passThrough)
this._mqttClient.register('IoT/Device/#', 'null2', mqtt.passThrough)
this._mqttClient.registerCallbackFunc('IoT/test', 'print2', (message: any) : any => {
log.info("Callback for IoT/test")
log.info(`message is ${message}`)
return `<<${message}>>`
})
this._mqttClient.registerCallbackFunc('IoT/test', 'null1', mqtt.passThrough)
this._mqttClient.registerCallbackFunc('IoT/test', 'null2', mqtt.passThrough)
}
exec() : void {
@ -23,12 +30,43 @@ class Dispatcher {
log.info("Dispatcher running")
}
test() : void {
log.info("Sending test data")
this._mqttClient.test()
}
}
// callchain.registerChainItemFunc('first', (message : any) : any => {
// log.info(`first callback ${message}`)
// return `<${message}>`
// })
// callchain.registerChainItemFunc('second', (message : any) : any => {
// log.info(`second callback ${message}`)
// return `<${message}>`
// })
// callchain.registerChainItemFunc('third', (message : any) : any => {
// log.info(`third callback ${message}`)
// return `<${message}>`
// })
// callchain.begin()
// callchain.send('test1')
// callchain.send('test2')
// callchain.send('test3')
// callchain.send('test4')
const dispatcher = new Dispatcher()
dispatcher.exec()
dispatcher.test()

View File

@ -1,33 +1,20 @@
import * as Mqtt from 'mqtt'
import * as log from './log'
import * as callchain from './callchain'
const MQTT_BROKER_DEFAULT_URL : string = "mqtt://localhost"
type TopicHandlerCallbackFunc = (message: any) => any
class TopicHandlerCallback {
private _label: string
private _func: TopicHandlerCallbackFunc
constructor(label: string, func: TopicHandlerCallbackFunc) {
this._label = label
this._func = func
}
get func() : TopicHandlerCallbackFunc {
return this._func
}
public toString() : string {
let funcName : string = (this._func.name === "") ? "lambda" : this._func.name
return `<${funcName}, ${this._label}>`
}
export function passThrough(message: any) {
return message
}
interface TopicHandler {
topic: string,
callbacks: TopicHandlerCallback[]
}
export function passThrough(message: any) {
return message
root: callchain.AChainItem | undefined
last: callchain.AChainItem | undefined
}
export class MqttClient {
@ -40,23 +27,37 @@ export class MqttClient {
this._topicHandlers = []
}
register(topic: string, label: string, callbackFunc: TopicHandlerCallbackFunc) : void {
registerCallbackFunc(topic: string, label: string, callbackFunc: callchain.ChainItemFunc) : void {
let newChainItem = new callchain.ChainItem(label)
newChainItem.registerFunc(callbackFunc)
this.registerCallbackClass(topic, label, newChainItem)
}
registerCallbackClass(topic: string, label: string, newChainItem: callchain.AChainItem) : void {
let done: boolean = false
let callback : TopicHandlerCallback = new TopicHandlerCallback(label, callbackFunc)
for (let topicHandler of this._topicHandlers) {
if (topicHandler.topic === topic) {
topicHandler.callbacks.push(callback)
(topicHandler.last as callchain.AChainItem).registerNext(newChainItem)
topicHandler.last = newChainItem
done = true
log.info(`additional callback ${callback.toString()} added for topic ${topic}`)
log.info(`additional callback ${newChainItem.toString()} added for topic ${topic}`)
}
}
if (! done) {
this._topicHandlers.push({topic: topic, callbacks:[ callback ]})
log.info(`first callback ${callback.toString()} added for topic ${topic}`)
this._topicHandlers.push({topic: topic, root: newChainItem, last: newChainItem})
log.info(`first callback ${newChainItem.toString()} added for topic ${topic}`)
}
}
test() : void {
this._mqttClient.emit("message", 'IoT/test', 'payload')
}
exec() : void {
for (let topicHandler of this._topicHandlers) {
(topicHandler.root as callchain.ChainItem).begin()
}
this._mqttClient = Mqtt.connect(this._mqttBrokerUrl)
this._mqttClient.on('error', log.error)
this._mqttClient.on('connect', (): void => {
@ -69,14 +70,8 @@ export class MqttClient {
log.info(`message received, topic ${topic}, payload ${payload}`)
for (let topicHandler of this._topicHandlers) {
if (this.topicMatch(topicHandler.topic, topic)) {
log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`)
// topicHandler.callback(payload)
let returnValue: any = payload
for (let topicHandlerCallback of topicHandler.callbacks) {
log.info(`Calling ${topicHandlerCallback}`)
returnValue = topicHandlerCallback.func(returnValue)
}
log.info(`Final return value is ${returnValue}`)
log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`);
(topicHandler.root as callchain.ChainItem).send(payload)
}
}
})