diff --git a/dist/main.js b/dist/main.js index f52742e..81ba9e7 100644 --- a/dist/main.js +++ b/dist/main.js @@ -5,21 +5,21 @@ const mqtt = require("./mqttdispatcher"); const fs = require("fs"); const cmdargs = require("command-line-args"); const EspThermToJson = require("./espthermtojson"); -const MongoSave = require("./mongosave"); -log.info("Dispatcher starting"); -const optionDefinitions = [ +const MissingEventDetector = require("./missingeventdetector"); +const OPTION_DEFINITIONS = [ { name: 'verbose', alias: 'v', type: Boolean }, { name: 'config', alias: 'c', type: String, defaultValue: '~/mqttDispatcher.conf' } ]; -const OPTIONS = cmdargs(optionDefinitions); -log.info(`OPTIONS.config`); +const OPTIONS = cmdargs(OPTION_DEFINITIONS); const CONFIG = JSON.parse(fs.readFileSync(OPTIONS.config, "utf8")); +log.info("Dispatcher starting"); let dispatcher = new mqtt.MqttDispatcher(CONFIG.brokerUrl, CONFIG.brokerUser, CONFIG.brokerPass, CONFIG.brokerCa); -dispatcher.register('IoT/espThermometer2/#', 'toJson', EspThermToJson.espThermToJson); -let mongoUrl = "mongodb://localhost/hottis"; -// let mongoUrl = "mongodb://receiver:esp8266.@cluster0-shard-00-00-7qduq.mongodb.net:27017,cluster0-shard-00-01-7qduq.mongodb.net:27017,cluster0-shard-00-02-7qduq.mongodb.net:27017/hottis?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin" -let mongo = new MongoSave.MongoSave(CONFIG.mongoDbUrl); -dispatcher.register('IoT/espThermometer2/#', 'MongoSave', mongo); +const ESP_THERM_TOPIC = 'IoT/espThermometer2/#'; +dispatcher.register(ESP_THERM_TOPIC, 'toJson', EspThermToJson.espThermToJson); +let missingeventdetector = new MissingEventDetector.MissingEventDetector(); +dispatcher.register(ESP_THERM_TOPIC, 'MissingEventDetector', missingeventdetector); +// let mongo : MongoSave.MongoSave = new MongoSave.MongoSave(CONFIG.mongoDbUrl) +// dispatcher.register(ESP_THERM_TOPIC, 'MongoSave', mongo); dispatcher.exec(); log.info("Dispatcher running"); //# sourceMappingURL=main.js.map \ No newline at end of file diff --git a/dist/missingeventdetector.js b/dist/missingeventdetector.js new file mode 100644 index 0000000..1e7c94a --- /dev/null +++ b/dist/missingeventdetector.js @@ -0,0 +1,60 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const log = require("./log"); +const CallChain = require("./callchain"); +const Processor = require("./processor"); +const CHECK_PERIOD = 10; // seconds +class ClientEntry { +} +class MissingEventProcessor extends Processor.AProcessor { + constructor() { + super("MissingEventProcessor"); + this.clientMap = new Map(); + this.timer = setInterval(() => { + this.clientMap.forEach((value, key) => { + let currentTime = new Date().getTime(); + let elapsedTime = currentTime - value.lastEvent; + log.info(`Checking ${key}, elapsed: ${elapsedTime / 1000}, avg. delay: ${value.avgDelay / 1000}`); + }); + }, CHECK_PERIOD * 1000); + } + process(message) { + let client = message.metadata.client; + log.info(`Event for client ${client}`); + let currentTime = new Date().getTime(); + if (this.clientMap.has(client)) { + let clientEntry = this.clientMap.get(client); + clientEntry.client = client; + clientEntry.delay = currentTime - clientEntry.lastEvent; + clientEntry.lastEvent = currentTime; + clientEntry.count += 1; + clientEntry.delaySum += clientEntry.delay; + clientEntry.avgDelay = clientEntry.delaySum / clientEntry.count; + this.clientMap.set(client, clientEntry); + log.info(`Entry for ${client} updated`); + } + else { + let clientEntry = new ClientEntry(); + clientEntry.client = client; + clientEntry.lastEvent = currentTime; + clientEntry.delay = 0; + clientEntry.count = 0; + clientEntry.delaySum = 0; + clientEntry.avgDelay = 0; + this.clientMap.set(client, clientEntry); + log.info(`Entry for ${client} inserted`); + } + } +} +class MissingEventDetector extends CallChain.ABaseChainItem { + constructor() { + super("MissingEventDetector"); + this.missingEventProcessor = new MissingEventProcessor(); + } + func(message) { + this.missingEventProcessor.in(message); + return message; + } +} +exports.MissingEventDetector = MissingEventDetector; +//# sourceMappingURL=missingeventdetector.js.map \ No newline at end of file diff --git a/dist/mqttdispatcher.js b/dist/mqttdispatcher.js index 18d2cfe..8f20518 100644 --- a/dist/mqttdispatcher.js +++ b/dist/mqttdispatcher.js @@ -60,8 +60,11 @@ class MqttDispatcher { this.mqttClient.subscribe(topicHandler.topic); } }); - this.mqttClient.on('message', (topic, payload) => { + this.mqttClient.on('message', (topic, payload, packet) => { log.info(`message received, topic ${topic}, payload ${payload}`); + if (packet.retain) { + log.info("IS RETAINED"); + } for (let topicHandler of this.topicHandlers) { if (this.topicMatch(topicHandler.topic, topic)) { log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); diff --git a/npm-debug.log b/npm-debug.log index 9052c60..e305fac 100644 --- a/npm-debug.log +++ b/npm-debug.log @@ -3,7 +3,8 @@ 1 verbose cli '/usr/bin/npm', 1 verbose cli 'start', 1 verbose cli '--', -1 verbose cli '--config=/home/wn/workspace-node/Dispatcher/config.json' ] +1 verbose cli '-c', +1 verbose cli 'config.json' ] 2 info using npm@3.10.10 3 info using node@v6.11.2 4 verbose run-script [ 'prestart', 'start', 'poststart' ] @@ -13,11 +14,10 @@ 8 verbose lifecycle dispatcher@1.0.0~start: unsafe-perm in lifecycle true 9 verbose lifecycle dispatcher@1.0.0~start: PATH: /usr/lib/node_modules/npm/bin/node-gyp-bin:/home/wn/workspace-node/Dispatcher/node_modules/.bin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/wn/.local/bin:/home/wn/bin:/opt/jdk/bin:/opt/apache-maven/bin:/home/wn/mos/bin 10 verbose lifecycle dispatcher@1.0.0~start: CWD: /home/wn/workspace-node/Dispatcher -11 silly lifecycle dispatcher@1.0.0~start: Args: [ '-c', -11 silly lifecycle 'node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"' ] +11 silly lifecycle dispatcher@1.0.0~start: Args: [ '-c', 'node dist/main.js "-c" "config.json"' ] 12 silly lifecycle dispatcher@1.0.0~start: Returned: code: 1 signal: null 13 info lifecycle dispatcher@1.0.0~start: Failed to exec start script -14 verbose stack Error: dispatcher@1.0.0 start: `node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"` +14 verbose stack Error: dispatcher@1.0.0 start: `node dist/main.js "-c" "config.json"` 14 verbose stack Exit status 1 14 verbose stack at EventEmitter. (/usr/lib/node_modules/npm/lib/utils/lifecycle.js:255:16) 14 verbose stack at emitTwo (events.js:106:13) @@ -30,18 +30,18 @@ 15 verbose pkgid dispatcher@1.0.0 16 verbose cwd /home/wn/workspace-node/Dispatcher 17 error Linux 4.9.0-3-amd64 -18 error argv "/usr/bin/nodejs" "/usr/bin/npm" "start" "--" "--config=/home/wn/workspace-node/Dispatcher/config.json" +18 error argv "/usr/bin/nodejs" "/usr/bin/npm" "start" "--" "-c" "config.json" 19 error node v6.11.2 20 error npm v3.10.10 21 error code ELIFECYCLE -22 error dispatcher@1.0.0 start: `node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"` +22 error dispatcher@1.0.0 start: `node dist/main.js "-c" "config.json"` 22 error Exit status 1 -23 error Failed at the dispatcher@1.0.0 start script 'node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"'. +23 error Failed at the dispatcher@1.0.0 start script 'node dist/main.js "-c" "config.json"'. 23 error Make sure you have the latest version of node.js and npm installed. 23 error If you do, this is most likely a problem with the dispatcher package, 23 error not with npm itself. 23 error Tell the author that this fails on your system: -23 error node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json" +23 error node dist/main.js "-c" "config.json" 23 error You can get information on how to open an issue for this project with: 23 error npm bugs dispatcher 23 error Or if that isn't available, you can get their info via: diff --git a/src/main.ts b/src/main.ts index c4f3a51..e81babf 100644 --- a/src/main.ts +++ b/src/main.ts @@ -6,23 +6,33 @@ import * as cmdargs from 'command-line-args' import * as EspThermToJson from './espthermtojson' import * as MongoSave from './mongosave' +import * as MissingEventDetector from './missingeventdetector' -log.info("Dispatcher starting") -const optionDefinitions = [ +const OPTION_DEFINITIONS = [ { name: 'verbose', alias: 'v', type: Boolean }, { name: 'config', alias: 'c', type: String, defaultValue: '~/mqttDispatcher.conf' } ]; -const OPTIONS = cmdargs(optionDefinitions) +const OPTIONS = cmdargs(OPTION_DEFINITIONS) const CONFIG = JSON.parse(fs.readFileSync(OPTIONS.config, "utf8")) + + + +log.info("Dispatcher starting") + let dispatcher = new mqtt.MqttDispatcher(CONFIG.brokerUrl, CONFIG.brokerUser, CONFIG.brokerPass, CONFIG.brokerCa) -dispatcher.register('IoT/espThermometer2/#', 'toJson', EspThermToJson.espThermToJson) +const ESP_THERM_TOPIC : string = 'IoT/espThermometer2/#' +dispatcher.register(ESP_THERM_TOPIC, 'toJson', EspThermToJson.espThermToJson) -let mongo : MongoSave.MongoSave = new MongoSave.MongoSave(CONFIG.mongoDbUrl) -dispatcher.register('IoT/espThermometer2/#', 'MongoSave', mongo); +let missingeventdetector : MissingEventDetector.MissingEventDetector = + new MissingEventDetector.MissingEventDetector() +dispatcher.register(ESP_THERM_TOPIC, 'MissingEventDetector', missingeventdetector) + +// let mongo : MongoSave.MongoSave = new MongoSave.MongoSave(CONFIG.mongoDbUrl) +// dispatcher.register(ESP_THERM_TOPIC, 'MongoSave', mongo); dispatcher.exec() diff --git a/src/missingeventdetector.ts b/src/missingeventdetector.ts new file mode 100644 index 0000000..023c001 --- /dev/null +++ b/src/missingeventdetector.ts @@ -0,0 +1,71 @@ +import * as log from './log' +import * as CallChain from './callchain' +import * as Processor from './processor' + + +const CHECK_PERIOD : number = 10 // seconds + +class ClientEntry { + client : string + lastEvent : number + delay : number + count : number + delaySum : number + avgDelay : number +} + +class MissingEventProcessor extends Processor.AProcessor { + private timer : NodeJS.Timer + private clientMap : Map = new Map() + + constructor() { + super("MissingEventProcessor") + this.timer = setInterval(() => { + this.clientMap.forEach((value : ClientEntry, key : string) : void => { + let currentTime : number = new Date().getTime() + let elapsedTime : number = currentTime - value.lastEvent + log.info(`Checking ${key}, elapsed: ${elapsedTime / 1000}, avg. delay: ${value.avgDelay / 1000}`) + }) + }, CHECK_PERIOD * 1000) + } + + protected process(message : any) : void { + let client = message.metadata.client + log.info(`Event for client ${client}`) + let currentTime : number = new Date().getTime() + if (this.clientMap.has(client)) { + let clientEntry = this.clientMap.get(client) + clientEntry.client = client + clientEntry.delay = currentTime - clientEntry.lastEvent + clientEntry.lastEvent = currentTime + clientEntry.count += 1 + clientEntry.delaySum += clientEntry.delay + clientEntry.avgDelay = clientEntry.delaySum / clientEntry.count + this.clientMap.set(client, clientEntry) + log.info(`Entry for ${client} updated`) + } else { + let clientEntry : ClientEntry = new ClientEntry() + clientEntry.client = client + clientEntry.lastEvent = currentTime + clientEntry.delay = 0 + clientEntry.count = 0 + clientEntry.delaySum = 0 + clientEntry.avgDelay = 0 + this.clientMap.set(client, clientEntry) + log.info(`Entry for ${client} inserted`) + } + } +} + +export class MissingEventDetector extends CallChain.ABaseChainItem { + private missingEventProcessor : MissingEventProcessor = new MissingEventProcessor() + + constructor() { + super("MissingEventDetector") + } + + protected func(message : any) : any { + this.missingEventProcessor.in(message) + return message + } +} diff --git a/src/mqttdispatcher.ts b/src/mqttdispatcher.ts index d24e23a..80ac410 100644 --- a/src/mqttdispatcher.ts +++ b/src/mqttdispatcher.ts @@ -84,8 +84,11 @@ export class MqttDispatcher implements IDispatcher { this.mqttClient.subscribe(topicHandler.topic) } }) - this.mqttClient.on('message', (topic: string, payload: Buffer): void => { + this.mqttClient.on('message', (topic: string, payload: Buffer, packet : Mqtt.IPublishPacket): void => { log.info(`message received, topic ${topic}, payload ${payload}`) + if (packet.retain) { + log.info("IS RETAINED") + } for (let topicHandler of this.topicHandlers) { if (this.topicMatch(topicHandler.topic, topic)) { log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`);