handle retained messages, introduc missing event detector

This commit is contained in:
Wolfgang Hottgenroth
2017-08-25 17:03:25 +02:00
parent 3967a6fb3a
commit 3309ba29e5
7 changed files with 173 additions and 26 deletions

20
dist/main.js vendored
View File

@ -5,21 +5,21 @@ const mqtt = require("./mqttdispatcher");
const fs = require("fs"); const fs = require("fs");
const cmdargs = require("command-line-args"); const cmdargs = require("command-line-args");
const EspThermToJson = require("./espthermtojson"); const EspThermToJson = require("./espthermtojson");
const MongoSave = require("./mongosave"); const MissingEventDetector = require("./missingeventdetector");
log.info("Dispatcher starting"); const OPTION_DEFINITIONS = [
const optionDefinitions = [
{ name: 'verbose', alias: 'v', type: Boolean }, { name: 'verbose', alias: 'v', type: Boolean },
{ name: 'config', alias: 'c', type: String, defaultValue: '~/mqttDispatcher.conf' } { name: 'config', alias: 'c', type: String, defaultValue: '~/mqttDispatcher.conf' }
]; ];
const OPTIONS = cmdargs(optionDefinitions); const OPTIONS = cmdargs(OPTION_DEFINITIONS);
log.info(`OPTIONS.config`);
const CONFIG = JSON.parse(fs.readFileSync(OPTIONS.config, "utf8")); const CONFIG = JSON.parse(fs.readFileSync(OPTIONS.config, "utf8"));
log.info("Dispatcher starting");
let dispatcher = new mqtt.MqttDispatcher(CONFIG.brokerUrl, CONFIG.brokerUser, CONFIG.brokerPass, CONFIG.brokerCa); let dispatcher = new mqtt.MqttDispatcher(CONFIG.brokerUrl, CONFIG.brokerUser, CONFIG.brokerPass, CONFIG.brokerCa);
dispatcher.register('IoT/espThermometer2/#', 'toJson', EspThermToJson.espThermToJson); const ESP_THERM_TOPIC = 'IoT/espThermometer2/#';
let mongoUrl = "mongodb://localhost/hottis"; dispatcher.register(ESP_THERM_TOPIC, 'toJson', EspThermToJson.espThermToJson);
// let mongoUrl = "mongodb://receiver:esp8266.@cluster0-shard-00-00-7qduq.mongodb.net:27017,cluster0-shard-00-01-7qduq.mongodb.net:27017,cluster0-shard-00-02-7qduq.mongodb.net:27017/hottis?ssl=true&replicaSet=Cluster0-shard-0&authSource=admin" let missingeventdetector = new MissingEventDetector.MissingEventDetector();
let mongo = new MongoSave.MongoSave(CONFIG.mongoDbUrl); dispatcher.register(ESP_THERM_TOPIC, 'MissingEventDetector', missingeventdetector);
dispatcher.register('IoT/espThermometer2/#', 'MongoSave', mongo); // let mongo : MongoSave.MongoSave = new MongoSave.MongoSave(CONFIG.mongoDbUrl)
// dispatcher.register(ESP_THERM_TOPIC, 'MongoSave', mongo);
dispatcher.exec(); dispatcher.exec();
log.info("Dispatcher running"); log.info("Dispatcher running");
//# sourceMappingURL=main.js.map //# sourceMappingURL=main.js.map

60
dist/missingeventdetector.js vendored Normal file
View File

@ -0,0 +1,60 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const log = require("./log");
const CallChain = require("./callchain");
const Processor = require("./processor");
const CHECK_PERIOD = 10; // seconds
class ClientEntry {
}
class MissingEventProcessor extends Processor.AProcessor {
constructor() {
super("MissingEventProcessor");
this.clientMap = new Map();
this.timer = setInterval(() => {
this.clientMap.forEach((value, key) => {
let currentTime = new Date().getTime();
let elapsedTime = currentTime - value.lastEvent;
log.info(`Checking ${key}, elapsed: ${elapsedTime / 1000}, avg. delay: ${value.avgDelay / 1000}`);
});
}, CHECK_PERIOD * 1000);
}
process(message) {
let client = message.metadata.client;
log.info(`Event for client ${client}`);
let currentTime = new Date().getTime();
if (this.clientMap.has(client)) {
let clientEntry = this.clientMap.get(client);
clientEntry.client = client;
clientEntry.delay = currentTime - clientEntry.lastEvent;
clientEntry.lastEvent = currentTime;
clientEntry.count += 1;
clientEntry.delaySum += clientEntry.delay;
clientEntry.avgDelay = clientEntry.delaySum / clientEntry.count;
this.clientMap.set(client, clientEntry);
log.info(`Entry for ${client} updated`);
}
else {
let clientEntry = new ClientEntry();
clientEntry.client = client;
clientEntry.lastEvent = currentTime;
clientEntry.delay = 0;
clientEntry.count = 0;
clientEntry.delaySum = 0;
clientEntry.avgDelay = 0;
this.clientMap.set(client, clientEntry);
log.info(`Entry for ${client} inserted`);
}
}
}
class MissingEventDetector extends CallChain.ABaseChainItem {
constructor() {
super("MissingEventDetector");
this.missingEventProcessor = new MissingEventProcessor();
}
func(message) {
this.missingEventProcessor.in(message);
return message;
}
}
exports.MissingEventDetector = MissingEventDetector;
//# sourceMappingURL=missingeventdetector.js.map

View File

@ -60,8 +60,11 @@ class MqttDispatcher {
this.mqttClient.subscribe(topicHandler.topic); this.mqttClient.subscribe(topicHandler.topic);
} }
}); });
this.mqttClient.on('message', (topic, payload) => { this.mqttClient.on('message', (topic, payload, packet) => {
log.info(`message received, topic ${topic}, payload ${payload}`); log.info(`message received, topic ${topic}, payload ${payload}`);
if (packet.retain) {
log.info("IS RETAINED");
}
for (let topicHandler of this.topicHandlers) { for (let topicHandler of this.topicHandlers) {
if (this.topicMatch(topicHandler.topic, topic)) { if (this.topicMatch(topicHandler.topic, topic)) {
log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`);

View File

@ -3,7 +3,8 @@
1 verbose cli '/usr/bin/npm', 1 verbose cli '/usr/bin/npm',
1 verbose cli 'start', 1 verbose cli 'start',
1 verbose cli '--', 1 verbose cli '--',
1 verbose cli '--config=/home/wn/workspace-node/Dispatcher/config.json' ] 1 verbose cli '-c',
1 verbose cli 'config.json' ]
2 info using npm@3.10.10 2 info using npm@3.10.10
3 info using node@v6.11.2 3 info using node@v6.11.2
4 verbose run-script [ 'prestart', 'start', 'poststart' ] 4 verbose run-script [ 'prestart', 'start', 'poststart' ]
@ -13,11 +14,10 @@
8 verbose lifecycle dispatcher@1.0.0~start: unsafe-perm in lifecycle true 8 verbose lifecycle dispatcher@1.0.0~start: unsafe-perm in lifecycle true
9 verbose lifecycle dispatcher@1.0.0~start: PATH: /usr/lib/node_modules/npm/bin/node-gyp-bin:/home/wn/workspace-node/Dispatcher/node_modules/.bin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/wn/.local/bin:/home/wn/bin:/opt/jdk/bin:/opt/apache-maven/bin:/home/wn/mos/bin 9 verbose lifecycle dispatcher@1.0.0~start: PATH: /usr/lib/node_modules/npm/bin/node-gyp-bin:/home/wn/workspace-node/Dispatcher/node_modules/.bin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/wn/.local/bin:/home/wn/bin:/opt/jdk/bin:/opt/apache-maven/bin:/home/wn/mos/bin
10 verbose lifecycle dispatcher@1.0.0~start: CWD: /home/wn/workspace-node/Dispatcher 10 verbose lifecycle dispatcher@1.0.0~start: CWD: /home/wn/workspace-node/Dispatcher
11 silly lifecycle dispatcher@1.0.0~start: Args: [ '-c', 11 silly lifecycle dispatcher@1.0.0~start: Args: [ '-c', 'node dist/main.js "-c" "config.json"' ]
11 silly lifecycle 'node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"' ]
12 silly lifecycle dispatcher@1.0.0~start: Returned: code: 1 signal: null 12 silly lifecycle dispatcher@1.0.0~start: Returned: code: 1 signal: null
13 info lifecycle dispatcher@1.0.0~start: Failed to exec start script 13 info lifecycle dispatcher@1.0.0~start: Failed to exec start script
14 verbose stack Error: dispatcher@1.0.0 start: `node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"` 14 verbose stack Error: dispatcher@1.0.0 start: `node dist/main.js "-c" "config.json"`
14 verbose stack Exit status 1 14 verbose stack Exit status 1
14 verbose stack at EventEmitter.<anonymous> (/usr/lib/node_modules/npm/lib/utils/lifecycle.js:255:16) 14 verbose stack at EventEmitter.<anonymous> (/usr/lib/node_modules/npm/lib/utils/lifecycle.js:255:16)
14 verbose stack at emitTwo (events.js:106:13) 14 verbose stack at emitTwo (events.js:106:13)
@ -30,18 +30,18 @@
15 verbose pkgid dispatcher@1.0.0 15 verbose pkgid dispatcher@1.0.0
16 verbose cwd /home/wn/workspace-node/Dispatcher 16 verbose cwd /home/wn/workspace-node/Dispatcher
17 error Linux 4.9.0-3-amd64 17 error Linux 4.9.0-3-amd64
18 error argv "/usr/bin/nodejs" "/usr/bin/npm" "start" "--" "--config=/home/wn/workspace-node/Dispatcher/config.json" 18 error argv "/usr/bin/nodejs" "/usr/bin/npm" "start" "--" "-c" "config.json"
19 error node v6.11.2 19 error node v6.11.2
20 error npm v3.10.10 20 error npm v3.10.10
21 error code ELIFECYCLE 21 error code ELIFECYCLE
22 error dispatcher@1.0.0 start: `node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"` 22 error dispatcher@1.0.0 start: `node dist/main.js "-c" "config.json"`
22 error Exit status 1 22 error Exit status 1
23 error Failed at the dispatcher@1.0.0 start script 'node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json"'. 23 error Failed at the dispatcher@1.0.0 start script 'node dist/main.js "-c" "config.json"'.
23 error Make sure you have the latest version of node.js and npm installed. 23 error Make sure you have the latest version of node.js and npm installed.
23 error If you do, this is most likely a problem with the dispatcher package, 23 error If you do, this is most likely a problem with the dispatcher package,
23 error not with npm itself. 23 error not with npm itself.
23 error Tell the author that this fails on your system: 23 error Tell the author that this fails on your system:
23 error node dist/main.js "--config=/home/wn/workspace-node/Dispatcher/config.json" 23 error node dist/main.js "-c" "config.json"
23 error You can get information on how to open an issue for this project with: 23 error You can get information on how to open an issue for this project with:
23 error npm bugs dispatcher 23 error npm bugs dispatcher
23 error Or if that isn't available, you can get their info via: 23 error Or if that isn't available, you can get their info via:

View File

@ -6,23 +6,33 @@ import * as cmdargs from 'command-line-args'
import * as EspThermToJson from './espthermtojson' import * as EspThermToJson from './espthermtojson'
import * as MongoSave from './mongosave' import * as MongoSave from './mongosave'
import * as MissingEventDetector from './missingeventdetector'
log.info("Dispatcher starting")
const optionDefinitions = [ const OPTION_DEFINITIONS = [
{ name: 'verbose', alias: 'v', type: Boolean }, { name: 'verbose', alias: 'v', type: Boolean },
{ name: 'config', alias: 'c', type: String, defaultValue: '~/mqttDispatcher.conf' } { name: 'config', alias: 'c', type: String, defaultValue: '~/mqttDispatcher.conf' }
]; ];
const OPTIONS = cmdargs(optionDefinitions) const OPTIONS = cmdargs(OPTION_DEFINITIONS)
const CONFIG = JSON.parse(fs.readFileSync(OPTIONS.config, "utf8")) const CONFIG = JSON.parse(fs.readFileSync(OPTIONS.config, "utf8"))
log.info("Dispatcher starting")
let dispatcher = new mqtt.MqttDispatcher(CONFIG.brokerUrl, let dispatcher = new mqtt.MqttDispatcher(CONFIG.brokerUrl,
CONFIG.brokerUser, CONFIG.brokerPass, CONFIG.brokerCa) CONFIG.brokerUser, CONFIG.brokerPass, CONFIG.brokerCa)
dispatcher.register('IoT/espThermometer2/#', 'toJson', EspThermToJson.espThermToJson) const ESP_THERM_TOPIC : string = 'IoT/espThermometer2/#'
dispatcher.register(ESP_THERM_TOPIC, 'toJson', EspThermToJson.espThermToJson)
let mongo : MongoSave.MongoSave = new MongoSave.MongoSave(CONFIG.mongoDbUrl) let missingeventdetector : MissingEventDetector.MissingEventDetector =
dispatcher.register('IoT/espThermometer2/#', 'MongoSave', mongo); new MissingEventDetector.MissingEventDetector()
dispatcher.register(ESP_THERM_TOPIC, 'MissingEventDetector', missingeventdetector)
// let mongo : MongoSave.MongoSave = new MongoSave.MongoSave(CONFIG.mongoDbUrl)
// dispatcher.register(ESP_THERM_TOPIC, 'MongoSave', mongo);
dispatcher.exec() dispatcher.exec()

View File

@ -0,0 +1,71 @@
import * as log from './log'
import * as CallChain from './callchain'
import * as Processor from './processor'
const CHECK_PERIOD : number = 10 // seconds
class ClientEntry {
client : string
lastEvent : number
delay : number
count : number
delaySum : number
avgDelay : number
}
class MissingEventProcessor extends Processor.AProcessor {
private timer : NodeJS.Timer
private clientMap : Map<string, ClientEntry> = new Map<string, ClientEntry>()
constructor() {
super("MissingEventProcessor")
this.timer = setInterval(() => {
this.clientMap.forEach((value : ClientEntry, key : string) : void => {
let currentTime : number = new Date().getTime()
let elapsedTime : number = currentTime - value.lastEvent
log.info(`Checking ${key}, elapsed: ${elapsedTime / 1000}, avg. delay: ${value.avgDelay / 1000}`)
})
}, CHECK_PERIOD * 1000)
}
protected process(message : any) : void {
let client = message.metadata.client
log.info(`Event for client ${client}`)
let currentTime : number = new Date().getTime()
if (this.clientMap.has(client)) {
let clientEntry = <ClientEntry>this.clientMap.get(client)
clientEntry.client = client
clientEntry.delay = currentTime - clientEntry.lastEvent
clientEntry.lastEvent = currentTime
clientEntry.count += 1
clientEntry.delaySum += clientEntry.delay
clientEntry.avgDelay = clientEntry.delaySum / clientEntry.count
this.clientMap.set(client, clientEntry)
log.info(`Entry for ${client} updated`)
} else {
let clientEntry : ClientEntry = new ClientEntry()
clientEntry.client = client
clientEntry.lastEvent = currentTime
clientEntry.delay = 0
clientEntry.count = 0
clientEntry.delaySum = 0
clientEntry.avgDelay = 0
this.clientMap.set(client, clientEntry)
log.info(`Entry for ${client} inserted`)
}
}
}
export class MissingEventDetector extends CallChain.ABaseChainItem {
private missingEventProcessor : MissingEventProcessor = new MissingEventProcessor()
constructor() {
super("MissingEventDetector")
}
protected func(message : any) : any {
this.missingEventProcessor.in(message)
return message
}
}

View File

@ -84,8 +84,11 @@ export class MqttDispatcher implements IDispatcher {
this.mqttClient.subscribe(topicHandler.topic) this.mqttClient.subscribe(topicHandler.topic)
} }
}) })
this.mqttClient.on('message', (topic: string, payload: Buffer): void => { this.mqttClient.on('message', (topic: string, payload: Buffer, packet : Mqtt.IPublishPacket): void => {
log.info(`message received, topic ${topic}, payload ${payload}`) log.info(`message received, topic ${topic}, payload ${payload}`)
if (packet.retain) {
log.info("IS RETAINED")
}
for (let topicHandler of this.topicHandlers) { for (let topicHandler of this.topicHandlers) {
if (this.topicMatch(topicHandler.topic, topic)) { if (this.topicMatch(topicHandler.topic, topic)) {
log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`);