remove leading underscore, fix next call in mongosave

This commit is contained in:
Wolfgang Hottgenroth 2017-08-24 15:51:34 +02:00
parent a0a922f851
commit 25116aae89
14 changed files with 230 additions and 170 deletions

39
dist/callchain.js vendored
View File

@ -6,53 +6,68 @@ class LastChainItem {
begin() { begin() {
} }
send(message) { send(message) {
log.info(`Last chain item, final result ${message}`); log.info(`Last chain item, final result ${JSON.stringify(message)}`);
} }
} }
let lastChainItem = new LastChainItem(); let lastChainItem = new LastChainItem();
class AChainItem extends events.EventEmitter { class AChainItem extends events.EventEmitter {
constructor(label) { constructor(label) {
super(); super();
this._label = label; this.label = label;
this._next = lastChainItem; this.next = lastChainItem;
} }
toString() { toString() {
return `<${this._label}>`; return `<${this.label}>`;
} }
registerNext(next) { registerNext(next) {
this._next = next; this.next = next;
} }
send(message) { send(message) {
this.emit('yourturn', message); this.emit('yourturn', message);
} }
} }
exports.AChainItem = AChainItem; exports.AChainItem = AChainItem;
class AAsyncBaseChainItem extends AChainItem {
constructor(label) {
super(label);
}
begin() {
if (this.next != null) {
this.next.begin();
}
this.addListener('yourturn', (message) => {
log.info(`Calling ${this.toString()}`);
this.func(message, this.next.send);
});
}
}
exports.AAsyncBaseChainItem = AAsyncBaseChainItem;
class ABaseChainItem extends AChainItem { class ABaseChainItem extends AChainItem {
constructor(label) { constructor(label) {
super(label); super(label);
} }
begin() { begin() {
if (this._next != null) { if (this.next != null) {
this._next.begin(); this.next.begin();
} }
this.addListener('yourturn', (message) => { this.addListener('yourturn', (message) => {
log.info(`Calling ${this.toString()}`); log.info(`Calling ${this.toString()}`);
let result = this.func(message); let result = this.func(message);
this._next.send(result); this.next.send(result);
}); });
} }
} }
exports.ABaseChainItem = ABaseChainItem; exports.ABaseChainItem = ABaseChainItem;
class ChainItem extends ABaseChainItem { class ChainItem extends ABaseChainItem {
toString() { toString() {
let funcName = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name; let funcName = (this.chainItemFunc.name === "") ? "lambda" : this.chainItemFunc.name;
return `<${funcName}, ${this._label}>`; return `<${funcName}, ${this.label}>`;
} }
registerFunc(func) { registerFunc(func) {
this._chainItemFunc = func; this.chainItemFunc = func;
} }
func(message) { func(message) {
return this._chainItemFunc(message); return this.chainItemFunc(message);
} }
} }
exports.ChainItem = ChainItem; exports.ChainItem = ChainItem;

View File

@ -1,12 +1,37 @@
"use strict"; "use strict";
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const utils = require("./utils"); const utils = require("./utils");
class EspThermData {
constructor(temperature, voltage, timeConsumed) {
this.temperature = temperature;
this.voltage = voltage;
this.timeConsumed = timeConsumed;
}
toString() {
return JSON.stringify(this);
}
toJSON() {
return utils.jsonPrepaper(this, []);
}
}
exports.EspThermData = EspThermData;
class EspThermMetadata {
constructor(client, timestamp) {
this.client = client;
this.timestamp = (timestamp) ? timestamp : new Date();
}
toString() {
return JSON.stringify(this);
}
toJSON() {
return utils.jsonPrepaper(this, []);
}
}
exports.EspThermMetadata = EspThermMetadata;
class EspThermMessage { class EspThermMessage {
constructor(client, temperature, voltage, timeConsumed) { constructor(client, temperature, voltage, timeConsumed) {
this._client = client; this.data = new EspThermData(temperature, voltage, timeConsumed);
this._temperature = temperature; this.metadata = new EspThermMetadata(client);
this._voltage = voltage;
this._timeConsumed = timeConsumed;
} }
toString() { toString() {
return JSON.stringify(this); return JSON.stringify(this);

32
dist/mongosave.js vendored
View File

@ -3,47 +3,51 @@ Object.defineProperty(exports, "__esModule", { value: true });
const CallChain = require("./callchain"); const CallChain = require("./callchain");
const log = require("./log"); const log = require("./log");
const MongoDB = require("mongodb"); const MongoDB = require("mongodb");
class MongoSave extends CallChain.ABaseChainItem { class MongoSave extends CallChain.AAsyncBaseChainItem {
constructor(url) { constructor(url) {
super('MongoSave'); super('MongoSave');
this._url = url; this.url = url;
this._mongoClient = new MongoDB.MongoClient(); this.mongoClient = new MongoDB.MongoClient();
this._connectPending = false; this.connectPending = false;
} }
func(message) { func(message, finished) {
if (!this._dbh) { if (!this.dbh) {
log.info("Not database connection yet"); log.info("Not database connection yet");
if (!this._connectPending) { if (!this.connectPending) {
this._connectPending = true; this.connectPending = true;
this._mongoClient.connect(this._url) this.mongoClient.connect(this.url)
.then((db) => { .then((db) => {
log.info("Successfully opened MongoDB connect"); log.info("Successfully opened MongoDB connect");
this._dbh = db; this.dbh = db;
}) })
.catch((err) => { .catch((err) => {
log.error(`Failure when opening MongoDB connect: ${err}`); log.error(`Failure when opening MongoDB connect: ${err}`);
this._dbh = undefined; this.dbh = undefined;
}); });
} }
else { else {
log.info("Connecting to database is pending"); log.info("Connecting to database is pending");
} }
} }
if (this._dbh) { if (this.dbh) {
log.info("Database handle is available"); log.info("Database handle is available");
let coll = this._dbh.collection("iot"); let coll = this.dbh.collection("iot");
coll.insertOne(message) coll.insertOne(message)
.then((res) => { .then((res) => {
log.info(`Successfully wrote one item in database: ${res.insertedId}`); log.info(`Successfully wrote one item in database: ${res.insertedId}`);
let nextValue = { id: res.insertedId, payload: message };
finished(nextValue);
}) })
.catch((err) => { .catch((err) => {
log.error(`Failure when trying to write one item in database: ${err}`); log.error(`Failure when trying to write one item in database: ${err}`);
log.error("Chain interrupted");
}); });
} }
else { else {
log.error(`No database connection yet, drop message ${message}`); log.error(`No database connection yet, drop message ${message}`);
log.error("Chain interrupted");
} }
return "<<" + message + ">>"; log.info(`Returning from ${this.label}`);
} }
} }
exports.MongoSave = MongoSave; exports.MongoSave = MongoSave;

View File

@ -11,17 +11,17 @@ function passThrough(message) {
exports.passThrough = passThrough; exports.passThrough = passThrough;
class MqttDispatcher { class MqttDispatcher {
constructor(mqttBrokerUrl, mqttUser, mqttPass, mqttCAFile) { constructor(mqttBrokerUrl, mqttUser, mqttPass, mqttCAFile) {
this._mqttOptions = {}; this.mqttOptions = {};
this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL; this.mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL;
if (mqttUser && mqttPass) { if (mqttUser && mqttPass) {
this._mqttOptions.username = mqttUser; this.mqttOptions.username = mqttUser;
this._mqttOptions.password = mqttPass; this.mqttOptions.password = mqttPass;
} }
if (mqttCAFile) { if (mqttCAFile) {
this._mqttOptions.ca = fs.readFileSync(mqttCAFile, 'ascii'); this.mqttOptions.ca = fs.readFileSync(mqttCAFile, 'ascii');
this._mqttOptions.rejectUnauthorized = true; this.mqttOptions.rejectUnauthorized = true;
} }
this._topicHandlers = []; this.topicHandlers = [];
} }
register(topic, label, newChainItemOrCallbackFunc) { register(topic, label, newChainItemOrCallbackFunc) {
let newChainItem; let newChainItem;
@ -34,7 +34,7 @@ class MqttDispatcher {
newChainItem = myNewChainItem; newChainItem = myNewChainItem;
} }
let done = false; let done = false;
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
if (topicHandler.topic === topic) { if (topicHandler.topic === topic) {
topicHandler.last.registerNext(newChainItem); topicHandler.last.registerNext(newChainItem);
topicHandler.last = newChainItem; topicHandler.last = newChainItem;
@ -43,26 +43,26 @@ class MqttDispatcher {
} }
} }
if (!done) { if (!done) {
this._topicHandlers.push({ topic: topic, root: newChainItem, last: newChainItem }); this.topicHandlers.push({ topic: topic, root: newChainItem, last: newChainItem });
log.info(`first callback ${newChainItem.toString()} added for topic ${topic}`); log.info(`first callback ${newChainItem.toString()} added for topic ${topic}`);
} }
} }
exec() { exec() {
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
topicHandler.root.begin(); topicHandler.root.begin();
} }
log.info(`connecting to ${this._mqttBrokerUrl}`); log.info(`connecting to ${this.mqttBrokerUrl}`);
this._mqttClient = Mqtt.connect(this._mqttBrokerUrl, this._mqttOptions); this.mqttClient = Mqtt.connect(this.mqttBrokerUrl, this.mqttOptions);
this._mqttClient.on('error', log.error); this.mqttClient.on('error', log.error);
this._mqttClient.on('connect', () => { this.mqttClient.on('connect', () => {
log.info("connected to mqtt broker"); log.info("connected to mqtt broker");
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
this._mqttClient.subscribe(topicHandler.topic); this.mqttClient.subscribe(topicHandler.topic);
} }
}); });
this._mqttClient.on('message', (topic, payload) => { this.mqttClient.on('message', (topic, payload) => {
log.info(`message received, topic ${topic}, payload ${payload}`); log.info(`message received, topic ${topic}, payload ${payload}`);
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
if (this.topicMatch(topicHandler.topic, topic)) { if (this.topicMatch(topicHandler.topic, topic)) {
log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`);
topicHandler.root.send(payload); topicHandler.root.send(payload);

8
dist/processor.js vendored
View File

@ -5,19 +5,19 @@ const events = require("events");
class AProcessor extends events.EventEmitter { class AProcessor extends events.EventEmitter {
constructor(label) { constructor(label) {
super(); super();
this._label = label; this.label = label;
this.addListener('input', this.process); this.addListener('input', this.process);
log.info(`Processor object instanciated: ${this.constructor.name}, ${this._label}`); log.info(`Processor object instanciated: ${this.constructor.name}, ${this.label}`);
} }
in(message) { in(message) {
log.info(`Routing ${message} to Processor class ${this.constructor.name}, ${this._label}`); log.info(`Routing ${message} to Processor class ${this.constructor.name}, ${this.label}`);
this.emit('input', message); this.emit('input', message);
} }
} }
exports.AProcessor = AProcessor; exports.AProcessor = AProcessor;
class ExProc1 extends AProcessor { class ExProc1 extends AProcessor {
process(message) { process(message) {
log.info(`ExRoute1.process: ${this._label}, ${message}`); log.info(`ExRoute1.process: ${this.label}, ${message}`);
} }
} }
exports.ExProc1 = ExProc1; exports.ExProc1 = ExProc1;

5
dist/utils.js vendored
View File

@ -3,9 +3,8 @@ Object.defineProperty(exports, "__esModule", { value: true });
function jsonPrepaper(obj, hideKeys) { function jsonPrepaper(obj, hideKeys) {
let dup = {}; let dup = {};
for (let key in obj) { for (let key in obj) {
if ((hideKeys.indexOf(key) == -1) && !((key[0] == "_") && (key[1] == "_"))) { if ((hideKeys.indexOf(key) == -1) && !(key[0] == "_")) {
let dkey = (key[0] == "_") ? key.slice(1) : key; dup[key] = obj[key];
dup[dkey] = obj[key];
} }
} }
return dup; return dup;

View File

@ -1,45 +0,0 @@
0 info it worked if it ends with ok
1 verbose cli [ '/usr/bin/nodejs', '/usr/bin/npm', 'start' ]
2 info using npm@3.10.10
3 info using node@v6.11.2
4 verbose run-script [ 'prestart', 'start', 'poststart' ]
5 info lifecycle dispatcher@1.0.0~prestart: dispatcher@1.0.0
6 silly lifecycle dispatcher@1.0.0~prestart: no script for prestart, continuing
7 info lifecycle dispatcher@1.0.0~start: dispatcher@1.0.0
8 verbose lifecycle dispatcher@1.0.0~start: unsafe-perm in lifecycle true
9 verbose lifecycle dispatcher@1.0.0~start: PATH: /usr/lib/node_modules/npm/bin/node-gyp-bin:/home/wn/workspace-node/Dispatcher/node_modules/.bin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/wn/.local/bin:/home/wn/bin:/opt/jdk/bin:/opt/apache-maven/bin:/home/wn/mos/bin
10 verbose lifecycle dispatcher@1.0.0~start: CWD: /home/wn/workspace-node/Dispatcher
11 silly lifecycle dispatcher@1.0.0~start: Args: [ '-c', 'node dist/main.js' ]
12 silly lifecycle dispatcher@1.0.0~start: Returned: code: 1 signal: null
13 info lifecycle dispatcher@1.0.0~start: Failed to exec start script
14 verbose stack Error: dispatcher@1.0.0 start: `node dist/main.js`
14 verbose stack Exit status 1
14 verbose stack at EventEmitter.<anonymous> (/usr/lib/node_modules/npm/lib/utils/lifecycle.js:255:16)
14 verbose stack at emitTwo (events.js:106:13)
14 verbose stack at EventEmitter.emit (events.js:191:7)
14 verbose stack at ChildProcess.<anonymous> (/usr/lib/node_modules/npm/lib/utils/spawn.js:40:14)
14 verbose stack at emitTwo (events.js:106:13)
14 verbose stack at ChildProcess.emit (events.js:191:7)
14 verbose stack at maybeClose (internal/child_process.js:891:16)
14 verbose stack at Process.ChildProcess._handle.onexit (internal/child_process.js:226:5)
15 verbose pkgid dispatcher@1.0.0
16 verbose cwd /home/wn/workspace-node/Dispatcher
17 error Linux 4.9.0-3-amd64
18 error argv "/usr/bin/nodejs" "/usr/bin/npm" "start"
19 error node v6.11.2
20 error npm v3.10.10
21 error code ELIFECYCLE
22 error dispatcher@1.0.0 start: `node dist/main.js`
22 error Exit status 1
23 error Failed at the dispatcher@1.0.0 start script 'node dist/main.js'.
23 error Make sure you have the latest version of node.js and npm installed.
23 error If you do, this is most likely a problem with the dispatcher package,
23 error not with npm itself.
23 error Tell the author that this fails on your system:
23 error node dist/main.js
23 error You can get information on how to open an issue for this project with:
23 error npm bugs dispatcher
23 error Or if that isn't available, you can get their info via:
23 error npm owner ls dispatcher
23 error There is likely additional logging output above.
24 verbose exit [ 1, true ]

1
queries/time-temp.js Normal file
View File

@ -0,0 +1 @@
db.iot.aggregate([{'$match':{'metadata.client':'espClient3'}}, {'$sort':{'metadata.timestamp':1}}, {'$project': {'_id':0, 'time':'$metadata.timestamp', 'temp':'$data.temperature'}}]);

View File

@ -13,29 +13,29 @@ class LastChainItem implements Receivable {
} }
public send(message: any) { public send(message: any) {
log.info(`Last chain item, final result ${message}`) log.info(`Last chain item, final result ${JSON.stringify(message)}`)
} }
} }
let lastChainItem : LastChainItem = new LastChainItem(); let lastChainItem : LastChainItem = new LastChainItem();
export abstract class AChainItem extends events.EventEmitter implements Receivable { export abstract class AChainItem extends events.EventEmitter implements Receivable {
protected _label : string protected label : string
protected _next : Receivable protected next : Receivable
constructor(label : string) { constructor(label : string) {
super() super()
this._label = label this.label = label
this._next = lastChainItem this.next = lastChainItem
} }
public toString() : string { public toString() : string {
return `<${this._label}>` return `<${this.label}>`
} }
public registerNext(next : AChainItem) :void { public registerNext(next : AChainItem) :void {
this._next = next this.next = next
} }
public send(message : any) : void { public send(message : any) : void {
@ -45,6 +45,24 @@ export abstract class AChainItem extends events.EventEmitter implements Receivab
public abstract begin() : void public abstract begin() : void
} }
export abstract class AAsyncBaseChainItem extends AChainItem {
constructor(label : string) {
super(label)
}
protected abstract func(message : any, finished : (message : any) => void) : void;
public begin() :void {
if (this.next != null) {
this.next.begin()
}
this.addListener('yourturn', (message : any) : void => {
log.info(`Calling ${this.toString()}`)
this.func(message, this.next.send)
})
}
}
export abstract class ABaseChainItem extends AChainItem { export abstract class ABaseChainItem extends AChainItem {
constructor(label : string) { constructor(label : string) {
super(label) super(label)
@ -53,31 +71,31 @@ export abstract class ABaseChainItem extends AChainItem {
protected abstract func(message : any) : any; protected abstract func(message : any) : any;
public begin() :void { public begin() :void {
if (this._next != null) { if (this.next != null) {
this._next.begin() this.next.begin()
} }
this.addListener('yourturn', (message : any) : void => { this.addListener('yourturn', (message : any) : void => {
log.info(`Calling ${this.toString()}`) log.info(`Calling ${this.toString()}`)
let result : any = this.func(message) let result : any = this.func(message)
this._next.send(result) this.next.send(result)
}) })
} }
} }
export class ChainItem extends ABaseChainItem { export class ChainItem extends ABaseChainItem {
private _chainItemFunc : ChainItemFunc private chainItemFunc : ChainItemFunc
public toString() : string { public toString() : string {
let funcName : string = (this._chainItemFunc.name === "") ? "lambda" : this._chainItemFunc.name let funcName : string = (this.chainItemFunc.name === "") ? "lambda" : this.chainItemFunc.name
return `<${funcName}, ${this._label}>` return `<${funcName}, ${this.label}>`
} }
public registerFunc(func : ChainItemFunc) : void { public registerFunc(func : ChainItemFunc) : void {
this._chainItemFunc = func this.chainItemFunc = func
} }
protected func(message : any) : any { protected func(message : any) : any {
return this._chainItemFunc(message) return this.chainItemFunc(message)
} }
} }

View File

@ -1,17 +1,52 @@
import * as log from './log' import * as log from './log'
import * as utils from './utils' import * as utils from './utils'
export class EspThermData {
private temperature : number
private voltage : number
private timeConsumed : number
constructor(temperature:number, voltage:number, timeConsumed:number) {
this.temperature = temperature
this.voltage = voltage
this.timeConsumed = timeConsumed
}
toString() :string {
return JSON.stringify(this)
}
toJSON() : any {
return utils.jsonPrepaper(this, [])
}
}
export class EspThermMetadata {
private client : string
private timestamp : Date
constructor(client : string, timestamp? : Date) {
this.client = client
this.timestamp = (timestamp) ? timestamp : new Date()
}
toString() :string {
return JSON.stringify(this)
}
toJSON() : any {
return utils.jsonPrepaper(this, [])
}
}
export class EspThermMessage { export class EspThermMessage {
private _client : string private data : EspThermData
private _temperature : number private metadata : EspThermMetadata
private _voltage : number
private _timeConsumed : number
constructor(client:string, temperature:number, voltage:number, timeConsumed:number) { constructor(client:string, temperature:number, voltage:number, timeConsumed:number) {
this._client = client this.data = new EspThermData(temperature, voltage, timeConsumed)
this._temperature = temperature this.metadata = new EspThermMetadata(client)
this._voltage = voltage
this._timeConsumed = timeConsumed
} }
toString() :string { toString() :string {

View File

@ -3,51 +3,60 @@ import * as log from './log'
import * as MongoDB from 'mongodb' import * as MongoDB from 'mongodb'
export class MongoSave extends CallChain.ABaseChainItem { export interface MongoItem {
private _url : string id : MongoDB.ObjectID
private _mongoClient : MongoDB.MongoClient payload : any
private _dbh : MongoDB.Db | undefined }
private _connectPending : boolean
export class MongoSave extends CallChain.AAsyncBaseChainItem {
private url : string
private mongoClient : MongoDB.MongoClient
private dbh : MongoDB.Db | undefined
private connectPending : boolean
constructor(url:string) { constructor(url:string) {
super('MongoSave') super('MongoSave')
this._url = url this.url = url
this._mongoClient = new MongoDB.MongoClient() this.mongoClient = new MongoDB.MongoClient()
this._connectPending = false this.connectPending = false
} }
protected func(message : any) : any { protected func(message : any, finished : (message : any) => void) : void {
if (! this._dbh) { if (! this.dbh) {
log.info("Not database connection yet") log.info("Not database connection yet")
if (! this._connectPending) { if (! this.connectPending) {
this._connectPending = true this.connectPending = true
this._mongoClient.connect(this._url) this.mongoClient.connect(this.url)
.then((db:MongoDB.Db) => { .then((db:MongoDB.Db) => {
log.info("Successfully opened MongoDB connect") log.info("Successfully opened MongoDB connect")
this._dbh = db this.dbh = db
}) })
.catch((err) => { .catch((err) => {
log.error(`Failure when opening MongoDB connect: ${err}`) log.error(`Failure when opening MongoDB connect: ${err}`)
this._dbh = undefined this.dbh = undefined
}) })
} else { } else {
log.info("Connecting to database is pending") log.info("Connecting to database is pending")
} }
} }
if (this._dbh) { if (this.dbh) {
log.info("Database handle is available") log.info("Database handle is available")
let coll : MongoDB.Collection = this._dbh.collection("iot") let coll : MongoDB.Collection = this.dbh.collection("iot")
coll.insertOne(message) coll.insertOne(message)
.then((res : MongoDB.InsertOneWriteOpResult) => { .then((res : MongoDB.InsertOneWriteOpResult) => {
log.info(`Successfully wrote one item in database: ${res.insertedId}`) log.info(`Successfully wrote one item in database: ${res.insertedId}`)
let nextValue = <MongoItem>{id: res.insertedId, payload: message}
finished(nextValue)
}) })
.catch((err : any) => { .catch((err : any) => {
log.error(`Failure when trying to write one item in database: ${err}`) log.error(`Failure when trying to write one item in database: ${err}`)
log.error("Chain interrupted")
}) })
} else { } else {
log.error(`No database connection yet, drop message ${message}`) log.error(`No database connection yet, drop message ${message}`)
log.error("Chain interrupted")
} }
return "<<" + message + ">>" log.info(`Returning from ${this.label}`)
} }
/* /*

View File

@ -25,24 +25,24 @@ export interface IDispatcher {
} }
export class MqttDispatcher implements IDispatcher { export class MqttDispatcher implements IDispatcher {
private _mqttClient: Mqtt.Client private mqttClient: Mqtt.Client
private _mqttOptions: Mqtt.IClientOptions = {} private mqttOptions: Mqtt.IClientOptions = {}
private _mqttBrokerUrl: string private mqttBrokerUrl: string
private _topicHandlers: TopicHandler[] private topicHandlers: TopicHandler[]
constructor(mqttBrokerUrl? : string, mqttUser? : string, mqttPass? : string, mqttCAFile? : string) { constructor(mqttBrokerUrl? : string, mqttUser? : string, mqttPass? : string, mqttCAFile? : string) {
this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL this.mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL
if (mqttUser && mqttPass) { if (mqttUser && mqttPass) {
this._mqttOptions.username = mqttUser this.mqttOptions.username = mqttUser
this._mqttOptions.password = mqttPass this.mqttOptions.password = mqttPass
} }
if (mqttCAFile) { if (mqttCAFile) {
this._mqttOptions.ca = fs.readFileSync(mqttCAFile, 'ascii') this.mqttOptions.ca = fs.readFileSync(mqttCAFile, 'ascii')
this._mqttOptions.rejectUnauthorized = true this.mqttOptions.rejectUnauthorized = true
} }
this._topicHandlers = [] this.topicHandlers = []
} }
register(topic: string, label: string, register(topic: string, label: string,
@ -56,7 +56,7 @@ export class MqttDispatcher implements IDispatcher {
newChainItem = myNewChainItem newChainItem = myNewChainItem
} }
let done: boolean = false let done: boolean = false
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
if (topicHandler.topic === topic) { if (topicHandler.topic === topic) {
(topicHandler.last as callchain.AChainItem).registerNext(newChainItem) (topicHandler.last as callchain.AChainItem).registerNext(newChainItem)
topicHandler.last = newChainItem topicHandler.last = newChainItem
@ -65,28 +65,28 @@ export class MqttDispatcher implements IDispatcher {
} }
} }
if (! done) { if (! done) {
this._topicHandlers.push({topic: topic, root: newChainItem, last: newChainItem}) this.topicHandlers.push({topic: topic, root: newChainItem, last: newChainItem})
log.info(`first callback ${newChainItem.toString()} added for topic ${topic}`) log.info(`first callback ${newChainItem.toString()} added for topic ${topic}`)
} }
} }
exec() : void { exec() : void {
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
(topicHandler.root as callchain.ChainItem).begin() (topicHandler.root as callchain.ChainItem).begin()
} }
log.info(`connecting to ${this._mqttBrokerUrl}`) log.info(`connecting to ${this.mqttBrokerUrl}`)
this._mqttClient = Mqtt.connect(this._mqttBrokerUrl, this._mqttOptions) this.mqttClient = Mqtt.connect(this.mqttBrokerUrl, this.mqttOptions)
this._mqttClient.on('error', log.error) this.mqttClient.on('error', log.error)
this._mqttClient.on('connect', (): void => { this.mqttClient.on('connect', (): void => {
log.info("connected to mqtt broker") log.info("connected to mqtt broker")
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
this._mqttClient.subscribe(topicHandler.topic) this.mqttClient.subscribe(topicHandler.topic)
} }
}) })
this._mqttClient.on('message', (topic: string, payload: Buffer): void => { this.mqttClient.on('message', (topic: string, payload: Buffer): void => {
log.info(`message received, topic ${topic}, payload ${payload}`) log.info(`message received, topic ${topic}, payload ${payload}`)
for (let topicHandler of this._topicHandlers) { for (let topicHandler of this.topicHandlers) {
if (this.topicMatch(topicHandler.topic, topic)) { if (this.topicMatch(topicHandler.topic, topic)) {
log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`);
(topicHandler.root as callchain.ChainItem).send(payload) (topicHandler.root as callchain.ChainItem).send(payload)

View File

@ -3,17 +3,17 @@ import * as events from 'events'
export abstract class AProcessor extends events.EventEmitter { export abstract class AProcessor extends events.EventEmitter {
protected _label : string protected label : string
constructor(label : string) { constructor(label : string) {
super() super()
this._label = label this.label = label
this.addListener('input', this.process) this.addListener('input', this.process)
log.info(`Processor object instanciated: ${this.constructor.name}, ${this._label}`) log.info(`Processor object instanciated: ${this.constructor.name}, ${this.label}`)
} }
public in(message : any) : void { public in(message : any) : void {
log.info(`Routing ${message} to Processor class ${this.constructor.name}, ${this._label}`) log.info(`Routing ${message} to Processor class ${this.constructor.name}, ${this.label}`)
this.emit('input', message) this.emit('input', message)
} }
@ -23,6 +23,6 @@ export abstract class AProcessor extends events.EventEmitter {
export class ExProc1 extends AProcessor { export class ExProc1 extends AProcessor {
protected process(message : any) : void { protected process(message : any) : void {
log.info(`ExRoute1.process: ${this._label}, ${message}`) log.info(`ExRoute1.process: ${this.label}, ${message}`)
} }
} }

View File

@ -1,9 +1,8 @@
export function jsonPrepaper(obj:any, hideKeys:string[]) : any { export function jsonPrepaper(obj:any, hideKeys:string[]) : any {
let dup = {} let dup = {}
for (let key in obj) { for (let key in obj) {
if ((hideKeys.indexOf(key) == -1) && ! ((key[0] == "_") && (key[1] == "_"))) { if ((hideKeys.indexOf(key) == -1) && ! (key[0] == "_")) {
let dkey = (key[0] == "_") ? key.slice(1) : key dup[key] = obj[key]
dup[dkey] = obj[key]
} }
} }
return dup return dup