From 3797d84b4d5ef77734f8a85aedca4ad1e793a398 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Wed, 23 Aug 2017 15:52:37 +0200 Subject: [PATCH] lot of changes for first actual use --- dist/callchain.js | 43 +++++++++++++++----------- dist/espthermtojson.js | 26 ++++++++++++++++ dist/main.js | 24 +++++---------- dist/mongosave.js | 13 ++++++++ dist/mqttdispatcher.js | 15 ++++++++-- dist/utils.js | 14 +++++++++ npm-debug.log | 4 +-- src/callchain.ts | 68 +++++++++++++++++++++++++++--------------- src/espthermtojson.ts | 33 ++++++++++++++++++++ src/main.ts | 25 +++++++--------- src/mongosave.ts | 30 +++++++++++++++++++ src/mqttdispatcher.ts | 20 +++++++++++-- src/utils.ts | 10 +++++++ tsconfig.json | 6 ++-- 14 files changed, 250 insertions(+), 81 deletions(-) create mode 100644 dist/espthermtojson.js create mode 100644 dist/mongosave.js create mode 100644 dist/utils.js create mode 100644 src/espthermtojson.ts create mode 100644 src/mongosave.ts create mode 100644 src/utils.ts diff --git a/dist/callchain.js b/dist/callchain.js index 9fa5bf7..3d8cc50 100644 --- a/dist/callchain.js +++ b/dist/callchain.js @@ -2,14 +2,22 @@ Object.defineProperty(exports, "__esModule", { value: true }); const log = require("./log"); const events = require("events"); +class LastChainItem { + begin() { + } + send(message) { + log.info(`Last chain item, final result ${message}`); + } +} +let lastChainItem = new LastChainItem(); class AChainItem extends events.EventEmitter { constructor(label) { super(); this._label = label; - this._next = null; + this._next = lastChainItem; } toString() { - return `<${this._label}`; + return `<${this._label}>`; } registerNext(next) { this._next = next; @@ -19,32 +27,33 @@ class AChainItem extends events.EventEmitter { } } exports.AChainItem = AChainItem; -class ChainItem extends AChainItem { +class ABaseChainItem extends AChainItem { constructor(label) { super(label); } - toString() { - let funcName = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name; - return `<${funcName}, ${this._label}>`; - } - registerFunc(func) { - this._chainItemFunc = func; - } begin() { if (this._next != null) { this._next.begin(); } this.addListener('yourturn', (message) => { log.info(`Calling ${this.toString()}`); - let result = this._chainItemFunc(message); - if (this._next == null) { - log.info(`Last chain item, final result ${result}`); - } - else { - this._next.send(result); - } + let result = this.func(message); + this._next.send(result); }); } } +exports.ABaseChainItem = ABaseChainItem; +class ChainItem extends ABaseChainItem { + toString() { + let funcName = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name; + return `<${funcName}, ${this._label}>`; + } + registerFunc(func) { + this._chainItemFunc = func; + } + func(message) { + return this._chainItemFunc(message); + } +} exports.ChainItem = ChainItem; //# sourceMappingURL=callchain.js.map \ No newline at end of file diff --git a/dist/espthermtojson.js b/dist/espthermtojson.js new file mode 100644 index 0000000..12c373c --- /dev/null +++ b/dist/espthermtojson.js @@ -0,0 +1,26 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const utils = require("./utils"); +class EspThermMessage { + constructor(client, temperature, voltage, timeConsumed) { + this._client = client; + this._temperature = temperature; + this._voltage = voltage; + this._timeConsumed = timeConsumed; + } + toString() { + return JSON.stringify(this); + } + toJSON() { + return utils.jsonPrepaper(this, []); + } +} +exports.EspThermMessage = EspThermMessage; +function espThermToJson(message) { + let messageStr = "" + message; + let parts = messageStr.split(' '); + let espThermMessage = new EspThermMessage(parts[0], parseFloat(parts[1]), parseFloat(parts[2]), parseInt(parts[3])); + return espThermMessage; +} +exports.espThermToJson = espThermToJson; +//# sourceMappingURL=espthermtojson.js.map \ No newline at end of file diff --git a/dist/main.js b/dist/main.js index b64e72a..73345ce 100644 --- a/dist/main.js +++ b/dist/main.js @@ -2,22 +2,14 @@ Object.defineProperty(exports, "__esModule", { value: true }); const log = require("./log"); const mqtt = require("./mqttdispatcher"); -const plugintest1 = require("./plugintest1"); +const EspThermToJson = require("./espthermtojson"); +const MongoSave = require("./mongosave"); log.info("Dispatcher starting"); -exports.dispatcher = new mqtt.MqttDispatcher(); -exports.dispatcher.register('IoT/test', 'print1', (message) => { - log.info("Callback for IoT/test"); - log.info(`message is ${message}`); - return `<<${message}>>`; -}); -exports.dispatcher.register('IoT/test', 'print2', (message) => { - log.info("Callback for IoT/test"); - log.info(`message is ${message}`); - return `<<${message}>>`; -}); -exports.dispatcher.register('IoT/test', 'null1', mqtt.passThrough); -exports.dispatcher.register('IoT/test', 'null2', mqtt.passThrough); -plugintest1.pluginTest1Start(exports.dispatcher); -exports.dispatcher.exec(); +let dispatcher = new mqtt.MqttDispatcher("mqtts://broker.hottis.de:8883", "wn", "locutus", "/home/wn/server-ca.crt"); +dispatcher.register('IoT/espThermometer2/#', 'toJson', EspThermToJson.espThermToJson); +let mongo = new MongoSave.MongoSave(); +dispatcher.register('IoT/espThermometer2/#', 'MongoSave', mongo); +// plugintest1.pluginTest1Start(dispatcher) +dispatcher.exec(); log.info("Dispatcher running"); //# sourceMappingURL=main.js.map \ No newline at end of file diff --git a/dist/mongosave.js b/dist/mongosave.js new file mode 100644 index 0000000..7edc882 --- /dev/null +++ b/dist/mongosave.js @@ -0,0 +1,13 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const CallChain = require("./callchain"); +class MongoSave extends CallChain.ABaseChainItem { + constructor() { + super('MongoSave'); + } + func(message) { + return "<<" + message + ">>"; + } +} +exports.MongoSave = MongoSave; +//# sourceMappingURL=mongosave.js.map \ No newline at end of file diff --git a/dist/mqttdispatcher.js b/dist/mqttdispatcher.js index 8970d07..3beeecd 100644 --- a/dist/mqttdispatcher.js +++ b/dist/mqttdispatcher.js @@ -3,14 +3,24 @@ Object.defineProperty(exports, "__esModule", { value: true }); const Mqtt = require("mqtt"); const log = require("./log"); const callchain = require("./callchain"); +const fs = require("fs"); const MQTT_BROKER_DEFAULT_URL = "mqtt://localhost"; function passThrough(message) { return message; } exports.passThrough = passThrough; class MqttDispatcher { - constructor(mqttBrokerUrl) { + constructor(mqttBrokerUrl, mqttUser, mqttPass, mqttCAFile) { + this._mqttOptions = {}; this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL; + if (mqttUser && mqttPass) { + this._mqttOptions.username = mqttUser; + this._mqttOptions.password = mqttPass; + } + if (mqttCAFile) { + this._mqttOptions.ca = fs.readFileSync(mqttCAFile, 'ascii'); + this._mqttOptions.rejectUnauthorized = true; + } this._topicHandlers = []; } register(topic, label, newChainItemOrCallbackFunc) { @@ -41,7 +51,8 @@ class MqttDispatcher { for (let topicHandler of this._topicHandlers) { topicHandler.root.begin(); } - this._mqttClient = Mqtt.connect(this._mqttBrokerUrl); + log.info(`connecting to ${this._mqttBrokerUrl}`); + this._mqttClient = Mqtt.connect(this._mqttBrokerUrl, this._mqttOptions); this._mqttClient.on('error', log.error); this._mqttClient.on('connect', () => { log.info("connected to mqtt broker"); diff --git a/dist/utils.js b/dist/utils.js new file mode 100644 index 0000000..5752800 --- /dev/null +++ b/dist/utils.js @@ -0,0 +1,14 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +function jsonPrepaper(obj, hideKeys) { + let dup = {}; + for (let key in obj) { + if ((hideKeys.indexOf(key) == -1) && !((key[0] == "_") && (key[1] == "_"))) { + let dkey = (key[0] == "_") ? key.slice(1) : key; + dup[dkey] = obj[key]; + } + } + return dup; +} +exports.jsonPrepaper = jsonPrepaper; +//# sourceMappingURL=utils.js.map \ No newline at end of file diff --git a/npm-debug.log b/npm-debug.log index 65ca798..3c5d893 100644 --- a/npm-debug.log +++ b/npm-debug.log @@ -1,7 +1,7 @@ 0 info it worked if it ends with ok 1 verbose cli [ '/usr/bin/nodejs', '/usr/bin/npm', 'start' ] 2 info using npm@3.10.10 -3 info using node@v6.11.1 +3 info using node@v6.11.2 4 verbose run-script [ 'prestart', 'start', 'poststart' ] 5 info lifecycle dispatcher@1.0.0~prestart: dispatcher@1.0.0 6 silly lifecycle dispatcher@1.0.0~prestart: no script for prestart, continuing @@ -26,7 +26,7 @@ 16 verbose cwd /home/wn/workspace-node/Dispatcher 17 error Linux 4.9.0-3-amd64 18 error argv "/usr/bin/nodejs" "/usr/bin/npm" "start" -19 error node v6.11.1 +19 error node v6.11.2 20 error npm v3.10.10 21 error code ELIFECYCLE 22 error dispatcher@1.0.0 start: `node dist/main.js` diff --git a/src/callchain.ts b/src/callchain.ts index a723f6b..e7c96b5 100644 --- a/src/callchain.ts +++ b/src/callchain.ts @@ -3,61 +3,81 @@ import * as events from 'events' export type ChainItemFunc = (message: any) => any -export abstract class AChainItem extends events.EventEmitter { +export interface Receivable { + begin() : void; + send(message : any) : void; +} + +class LastChainItem implements Receivable { + public begin() : void { + } + + public send(message: any) { + log.info(`Last chain item, final result ${message}`) + } +} + +let lastChainItem : LastChainItem = new LastChainItem(); + +export abstract class AChainItem extends events.EventEmitter implements Receivable { protected _label : string - protected _next : AChainItem | null + protected _next : Receivable constructor(label : string) { super() this._label = label - this._next = null + this._next = lastChainItem } public toString() : string { - return `<${this._label}` + return `<${this._label}>` } - registerNext(next : AChainItem) :void { + public registerNext(next : AChainItem) :void { this._next = next } - send(message : any) : void { + public send(message : any) : void { this.emit('yourturn', message) } - abstract begin() : void + public abstract begin() : void } -export class ChainItem extends AChainItem { - private _chainItemFunc : ChainItemFunc - +export abstract class ABaseChainItem extends AChainItem { constructor(label : string) { super(label) } + protected abstract func(message : any) : any; + + public begin() :void { + if (this._next != null) { + this._next.begin() + } + this.addListener('yourturn', (message : any) : void => { + log.info(`Calling ${this.toString()}`) + let result : any = this.func(message) + this._next.send(result) + }) + } +} + +export class ChainItem extends ABaseChainItem { + private _chainItemFunc : ChainItemFunc + public toString() : string { let funcName : string = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name return `<${funcName}, ${this._label}>` } - registerFunc(func : ChainItemFunc) : void { + public registerFunc(func : ChainItemFunc) : void { this._chainItemFunc = func } - begin() :void { - if (this._next != null) { - this._next.begin() - } - this.addListener('yourturn', (message : any) : void => { - log.info(`Calling ${this.toString()}`) - let result : any = this._chainItemFunc(message) - if (this._next == null) { - log.info(`Last chain item, final result ${result}`) - } else { - this._next.send(result) - } - }) + protected func(message : any) : any { + return this._chainItemFunc(message) } } diff --git a/src/espthermtojson.ts b/src/espthermtojson.ts new file mode 100644 index 0000000..8746ef0 --- /dev/null +++ b/src/espthermtojson.ts @@ -0,0 +1,33 @@ +import * as log from './log' +import * as utils from './utils' + +export class EspThermMessage { + private _client : string + private _temperature : number + private _voltage : number + private _timeConsumed : number + + constructor(client:string, temperature:number, voltage:number, timeConsumed:number) { + this._client = client + this._temperature = temperature + this._voltage = voltage + this._timeConsumed = timeConsumed + } + + toString() :string { + return JSON.stringify(this) + } + + toJSON() : any { + return utils.jsonPrepaper(this, []) + } +} + +export function espThermToJson(message : any) : any { + let messageStr : string = "" + message + let parts : string[] = messageStr.split(' ') + let espThermMessage : EspThermMessage = new EspThermMessage(parts[0], + parseFloat(parts[1]), parseFloat(parts[2]), + parseInt(parts[3])) + return espThermMessage +} \ No newline at end of file diff --git a/src/main.ts b/src/main.ts index 03082a5..86eb77b 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,23 +3,20 @@ import * as mqtt from './mqttdispatcher' import * as callchain from './callchain' import * as plugintest1 from './plugintest1' +import * as EspThermToJson from './espthermtojson' +import * as MongoSave from './mongosave' log.info("Dispatcher starting") -export const dispatcher = new mqtt.MqttDispatcher() -dispatcher.register('IoT/test', 'print1', (message: any) : any => { - log.info("Callback for IoT/test") - log.info(`message is ${message}`) - return `<<${message}>>` -}) -dispatcher.register('IoT/test', 'print2', (message: any) : any => { - log.info("Callback for IoT/test") - log.info(`message is ${message}`) - return `<<${message}>>` -}) -dispatcher.register('IoT/test', 'null1', mqtt.passThrough) -dispatcher.register('IoT/test', 'null2', mqtt.passThrough) +let dispatcher = new mqtt.MqttDispatcher("mqtts://broker.hottis.de:8883", + "wn", "locutus", "/home/wn/server-ca.crt") -plugintest1.pluginTest1Start(dispatcher) +dispatcher.register('IoT/espThermometer2/#', 'toJson', EspThermToJson.espThermToJson) + +let mongo : MongoSave.MongoSave = new MongoSave.MongoSave() +dispatcher.register('IoT/espThermometer2/#', 'MongoSave', mongo); + + +// plugintest1.pluginTest1Start(dispatcher) dispatcher.exec() log.info("Dispatcher running") diff --git a/src/mongosave.ts b/src/mongosave.ts new file mode 100644 index 0000000..ff3ebca --- /dev/null +++ b/src/mongosave.ts @@ -0,0 +1,30 @@ +import * as CallChain from './callchain' +import * as log from './log' + +export class MongoSave extends CallChain.ABaseChainItem { + constructor() { + super('MongoSave') + } + + protected func(message : any) : any { + return "<<" + message + ">>" + } + + /* + public begin() :void { + if (this._next != null) { + this._next.begin() + } + this.addListener('yourturn', (message : any) : void => { + log.info(`Calling ${this.toString()}`) + let result : any = this.func(message) + if (this._next == null) { + log.info(`Last chain item, final result ${result}`) + } else { + this._next.send(result) + } + }) + } + */ + +} diff --git a/src/mqttdispatcher.ts b/src/mqttdispatcher.ts index c393167..481bab4 100644 --- a/src/mqttdispatcher.ts +++ b/src/mqttdispatcher.ts @@ -1,10 +1,12 @@ import * as Mqtt from 'mqtt' import * as log from './log' import * as callchain from './callchain' - +import * as fs from 'fs' const MQTT_BROKER_DEFAULT_URL : string = "mqtt://localhost" + + export function passThrough(message: any) { return message } @@ -24,11 +26,22 @@ export interface IDispatcher { export class MqttDispatcher implements IDispatcher { private _mqttClient: Mqtt.Client + private _mqttOptions: Mqtt.IClientOptions = {} private _mqttBrokerUrl: string private _topicHandlers: TopicHandler[] - constructor(mqttBrokerUrl? : string) { + constructor(mqttBrokerUrl? : string, mqttUser? : string, mqttPass? : string, mqttCAFile? : string) { this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL + if (mqttUser && mqttPass) { + this._mqttOptions.username = mqttUser + this._mqttOptions.password = mqttPass + } + + if (mqttCAFile) { + this._mqttOptions.ca = fs.readFileSync(mqttCAFile, 'ascii') + this._mqttOptions.rejectUnauthorized = true + } + this._topicHandlers = [] } @@ -62,7 +75,8 @@ export class MqttDispatcher implements IDispatcher { (topicHandler.root as callchain.ChainItem).begin() } - this._mqttClient = Mqtt.connect(this._mqttBrokerUrl) + log.info(`connecting to ${this._mqttBrokerUrl}`) + this._mqttClient = Mqtt.connect(this._mqttBrokerUrl, this._mqttOptions) this._mqttClient.on('error', log.error) this._mqttClient.on('connect', (): void => { log.info("connected to mqtt broker") diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..165933a --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,10 @@ +export function jsonPrepaper(obj:any, hideKeys:string[]) : any { + let dup = {} + for (let key in obj) { + if ((hideKeys.indexOf(key) == -1) && ! ((key[0] == "_") && (key[1] == "_"))) { + let dkey = (key[0] == "_") ? key.slice(1) : key + dup[dkey] = obj[key] + } + } + return dup +} diff --git a/tsconfig.json b/tsconfig.json index ebd3e33..db4d418 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,12 +1,12 @@ { "compilerOptions": { - "target": "es2015", + "target": "es6", "module": "commonjs", "moduleResolution": "node", "sourceMap": true, - "lib": ["es2015"], + "lib": ["es6"], "strictNullChecks": true, - "noImplicitAny": true, + //"noImplicitAny": true, "noEmitOnError": true, "outDir": "dist", "typeRoots": [