lot of changes for first actual use
This commit is contained in:
@ -3,61 +3,81 @@ import * as events from 'events'
|
||||
|
||||
export type ChainItemFunc = (message: any) => any
|
||||
|
||||
export abstract class AChainItem extends events.EventEmitter {
|
||||
export interface Receivable {
|
||||
begin() : void;
|
||||
send(message : any) : void;
|
||||
}
|
||||
|
||||
class LastChainItem implements Receivable {
|
||||
public begin() : void {
|
||||
}
|
||||
|
||||
public send(message: any) {
|
||||
log.info(`Last chain item, final result ${message}`)
|
||||
}
|
||||
}
|
||||
|
||||
let lastChainItem : LastChainItem = new LastChainItem();
|
||||
|
||||
export abstract class AChainItem extends events.EventEmitter implements Receivable {
|
||||
protected _label : string
|
||||
protected _next : AChainItem | null
|
||||
protected _next : Receivable
|
||||
|
||||
|
||||
constructor(label : string) {
|
||||
super()
|
||||
this._label = label
|
||||
this._next = null
|
||||
this._next = lastChainItem
|
||||
}
|
||||
|
||||
public toString() : string {
|
||||
return `<${this._label}`
|
||||
return `<${this._label}>`
|
||||
}
|
||||
|
||||
registerNext(next : AChainItem) :void {
|
||||
public registerNext(next : AChainItem) :void {
|
||||
this._next = next
|
||||
}
|
||||
|
||||
send(message : any) : void {
|
||||
public send(message : any) : void {
|
||||
this.emit('yourturn', message)
|
||||
}
|
||||
|
||||
abstract begin() : void
|
||||
public abstract begin() : void
|
||||
}
|
||||
|
||||
export class ChainItem extends AChainItem {
|
||||
private _chainItemFunc : ChainItemFunc
|
||||
|
||||
export abstract class ABaseChainItem extends AChainItem {
|
||||
constructor(label : string) {
|
||||
super(label)
|
||||
}
|
||||
|
||||
protected abstract func(message : any) : any;
|
||||
|
||||
public begin() :void {
|
||||
if (this._next != null) {
|
||||
this._next.begin()
|
||||
}
|
||||
this.addListener('yourturn', (message : any) : void => {
|
||||
log.info(`Calling ${this.toString()}`)
|
||||
let result : any = this.func(message)
|
||||
this._next.send(result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export class ChainItem extends ABaseChainItem {
|
||||
private _chainItemFunc : ChainItemFunc
|
||||
|
||||
public toString() : string {
|
||||
let funcName : string = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name
|
||||
return `<${funcName}, ${this._label}>`
|
||||
}
|
||||
|
||||
registerFunc(func : ChainItemFunc) : void {
|
||||
public registerFunc(func : ChainItemFunc) : void {
|
||||
this._chainItemFunc = func
|
||||
}
|
||||
|
||||
begin() :void {
|
||||
if (this._next != null) {
|
||||
this._next.begin()
|
||||
}
|
||||
this.addListener('yourturn', (message : any) : void => {
|
||||
log.info(`Calling ${this.toString()}`)
|
||||
let result : any = this._chainItemFunc(message)
|
||||
if (this._next == null) {
|
||||
log.info(`Last chain item, final result ${result}`)
|
||||
} else {
|
||||
this._next.send(result)
|
||||
}
|
||||
})
|
||||
protected func(message : any) : any {
|
||||
return this._chainItemFunc(message)
|
||||
}
|
||||
}
|
||||
|
||||
|
33
src/espthermtojson.ts
Normal file
33
src/espthermtojson.ts
Normal file
@ -0,0 +1,33 @@
|
||||
import * as log from './log'
|
||||
import * as utils from './utils'
|
||||
|
||||
export class EspThermMessage {
|
||||
private _client : string
|
||||
private _temperature : number
|
||||
private _voltage : number
|
||||
private _timeConsumed : number
|
||||
|
||||
constructor(client:string, temperature:number, voltage:number, timeConsumed:number) {
|
||||
this._client = client
|
||||
this._temperature = temperature
|
||||
this._voltage = voltage
|
||||
this._timeConsumed = timeConsumed
|
||||
}
|
||||
|
||||
toString() :string {
|
||||
return JSON.stringify(this)
|
||||
}
|
||||
|
||||
toJSON() : any {
|
||||
return utils.jsonPrepaper(this, [])
|
||||
}
|
||||
}
|
||||
|
||||
export function espThermToJson(message : any) : any {
|
||||
let messageStr : string = "" + <string>message
|
||||
let parts : string[] = messageStr.split(' ')
|
||||
let espThermMessage : EspThermMessage = new EspThermMessage(parts[0],
|
||||
parseFloat(parts[1]), parseFloat(parts[2]),
|
||||
parseInt(parts[3]))
|
||||
return espThermMessage
|
||||
}
|
25
src/main.ts
25
src/main.ts
@ -3,23 +3,20 @@ import * as mqtt from './mqttdispatcher'
|
||||
import * as callchain from './callchain'
|
||||
import * as plugintest1 from './plugintest1'
|
||||
|
||||
import * as EspThermToJson from './espthermtojson'
|
||||
import * as MongoSave from './mongosave'
|
||||
|
||||
log.info("Dispatcher starting")
|
||||
export const dispatcher = new mqtt.MqttDispatcher()
|
||||
dispatcher.register('IoT/test', 'print1', (message: any) : any => {
|
||||
log.info("Callback for IoT/test")
|
||||
log.info(`message is ${message}`)
|
||||
return `<<${message}>>`
|
||||
})
|
||||
dispatcher.register('IoT/test', 'print2', (message: any) : any => {
|
||||
log.info("Callback for IoT/test")
|
||||
log.info(`message is ${message}`)
|
||||
return `<<${message}>>`
|
||||
})
|
||||
dispatcher.register('IoT/test', 'null1', mqtt.passThrough)
|
||||
dispatcher.register('IoT/test', 'null2', mqtt.passThrough)
|
||||
let dispatcher = new mqtt.MqttDispatcher("mqtts://broker.hottis.de:8883",
|
||||
"wn", "locutus", "/home/wn/server-ca.crt")
|
||||
|
||||
plugintest1.pluginTest1Start(dispatcher)
|
||||
dispatcher.register('IoT/espThermometer2/#', 'toJson', EspThermToJson.espThermToJson)
|
||||
|
||||
let mongo : MongoSave.MongoSave = new MongoSave.MongoSave()
|
||||
dispatcher.register('IoT/espThermometer2/#', 'MongoSave', mongo);
|
||||
|
||||
|
||||
// plugintest1.pluginTest1Start(dispatcher)
|
||||
|
||||
dispatcher.exec()
|
||||
log.info("Dispatcher running")
|
||||
|
30
src/mongosave.ts
Normal file
30
src/mongosave.ts
Normal file
@ -0,0 +1,30 @@
|
||||
import * as CallChain from './callchain'
|
||||
import * as log from './log'
|
||||
|
||||
export class MongoSave extends CallChain.ABaseChainItem {
|
||||
constructor() {
|
||||
super('MongoSave')
|
||||
}
|
||||
|
||||
protected func(message : any) : any {
|
||||
return "<<" + message + ">>"
|
||||
}
|
||||
|
||||
/*
|
||||
public begin() :void {
|
||||
if (this._next != null) {
|
||||
this._next.begin()
|
||||
}
|
||||
this.addListener('yourturn', (message : any) : void => {
|
||||
log.info(`Calling ${this.toString()}`)
|
||||
let result : any = this.func(message)
|
||||
if (this._next == null) {
|
||||
log.info(`Last chain item, final result ${result}`)
|
||||
} else {
|
||||
this._next.send(result)
|
||||
}
|
||||
})
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
@ -1,10 +1,12 @@
|
||||
import * as Mqtt from 'mqtt'
|
||||
import * as log from './log'
|
||||
import * as callchain from './callchain'
|
||||
|
||||
import * as fs from 'fs'
|
||||
|
||||
const MQTT_BROKER_DEFAULT_URL : string = "mqtt://localhost"
|
||||
|
||||
|
||||
|
||||
export function passThrough(message: any) {
|
||||
return message
|
||||
}
|
||||
@ -24,11 +26,22 @@ export interface IDispatcher {
|
||||
|
||||
export class MqttDispatcher implements IDispatcher {
|
||||
private _mqttClient: Mqtt.Client
|
||||
private _mqttOptions: Mqtt.IClientOptions = {}
|
||||
private _mqttBrokerUrl: string
|
||||
private _topicHandlers: TopicHandler[]
|
||||
|
||||
constructor(mqttBrokerUrl? : string) {
|
||||
constructor(mqttBrokerUrl? : string, mqttUser? : string, mqttPass? : string, mqttCAFile? : string) {
|
||||
this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL
|
||||
if (mqttUser && mqttPass) {
|
||||
this._mqttOptions.username = mqttUser
|
||||
this._mqttOptions.password = mqttPass
|
||||
}
|
||||
|
||||
if (mqttCAFile) {
|
||||
this._mqttOptions.ca = fs.readFileSync(mqttCAFile, 'ascii')
|
||||
this._mqttOptions.rejectUnauthorized = true
|
||||
}
|
||||
|
||||
this._topicHandlers = []
|
||||
}
|
||||
|
||||
@ -62,7 +75,8 @@ export class MqttDispatcher implements IDispatcher {
|
||||
(topicHandler.root as callchain.ChainItem).begin()
|
||||
}
|
||||
|
||||
this._mqttClient = Mqtt.connect(this._mqttBrokerUrl)
|
||||
log.info(`connecting to ${this._mqttBrokerUrl}`)
|
||||
this._mqttClient = Mqtt.connect(this._mqttBrokerUrl, this._mqttOptions)
|
||||
this._mqttClient.on('error', log.error)
|
||||
this._mqttClient.on('connect', (): void => {
|
||||
log.info("connected to mqtt broker")
|
||||
|
10
src/utils.ts
Normal file
10
src/utils.ts
Normal file
@ -0,0 +1,10 @@
|
||||
export function jsonPrepaper(obj:any, hideKeys:string[]) : any {
|
||||
let dup = {}
|
||||
for (let key in obj) {
|
||||
if ((hideKeys.indexOf(key) == -1) && ! ((key[0] == "_") && (key[1] == "_"))) {
|
||||
let dkey = (key[0] == "_") ? key.slice(1) : key
|
||||
dup[dkey] = obj[key]
|
||||
}
|
||||
}
|
||||
return dup
|
||||
}
|
Reference in New Issue
Block a user