From dad740e620cee5fef3bc1dde08e53e289db31a3f Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Sat, 22 Jul 2017 23:39:14 +0200 Subject: [PATCH] initial --- .gitignore | 2 ++ dist/log.js | 57 +++++++++++++++++++++++++++++++++++++ dist/main.js | 18 ++++++++++++ dist/mqttclient.js | 60 +++++++++++++++++++++++++++++++++++++++ package.json | 25 ++++++++++++++++ src/log.ts | 50 ++++++++++++++++++++++++++++++++ src/main.ts | 27 ++++++++++++++++++ src/mqttclient.ts | 71 ++++++++++++++++++++++++++++++++++++++++++++++ tsconfig.json | 25 ++++++++++++++++ 9 files changed, 335 insertions(+) create mode 100644 .gitignore create mode 100644 dist/log.js create mode 100644 dist/main.js create mode 100644 dist/mqttclient.js create mode 100644 package.json create mode 100644 src/log.ts create mode 100644 src/main.ts create mode 100644 src/mqttclient.ts create mode 100644 tsconfig.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fc467d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules +*.map diff --git a/dist/log.js b/dist/log.js new file mode 100644 index 0000000..0ff9c77 --- /dev/null +++ b/dist/log.js @@ -0,0 +1,57 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const chalk = require("chalk"); +const moment = require("moment"); +var Level; +(function (Level) { + Level[Level["All"] = 0] = "All"; + Level[Level["NoDebug"] = 1] = "NoDebug"; + Level[Level["NoDebugNoInfo"] = 2] = "NoDebugNoInfo"; + Level[Level["NoDebugNoInfoNoWarning"] = 3] = "NoDebugNoInfoNoWarning"; +})(Level || (Level = {})); +var level = Level.NoDebug; +function timestamp() { + return moment().format('HH:mm:ss.SSS'); +} +function setLevel(value) { + switch (value) { + case 'info': + level = Level.NoDebug; + break; + case 'warn': + level = Level.NoDebugNoInfo; + break; + case 'error': + level = Level.NoDebugNoInfoNoWarning; + break; + default: level = Level.All; + } +} +exports.setLevel = setLevel; +function info(message) { + if (level < Level.NoDebugNoInfo) { + console.log(`${timestamp()} ${chalk.bold.cyan('[ II ]')} ${message}`); + } +} +exports.info = info; +function warn(message) { + if (level < Level.NoDebugNoInfoNoWarning) { + console.log(`${timestamp()} ${chalk.bold.yellow('[ WW ]')} ${message}`); + } +} +exports.warn = warn; +function error(message) { + console.log(`${timestamp()} ${chalk.bold.red('[ EE ]')} ${message}`); +} +exports.error = error; +function success(message) { + console.log(`${timestamp()} ${chalk.bold.green('[ OK ]')} ${message}`); +} +exports.success = success; +function debug(message) { + if (level < Level.NoDebug) { + console.log(`${timestamp()} ${chalk.bold.magenta('[ DB ]')} ${message}`); + } +} +exports.debug = debug; +//# sourceMappingURL=log.js.map \ No newline at end of file diff --git a/dist/main.js b/dist/main.js new file mode 100644 index 0000000..be4af7d --- /dev/null +++ b/dist/main.js @@ -0,0 +1,18 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const log = require("./log"); +const MqttClient = require("./mqttclient"); +class Dispatcher { + constructor() { + this._mqttClient = new MqttClient.MqttClient(); + this._mqttClient.register('IoT/test', null); + this._mqttClient.register('IoT/Device/#', null); + } + exec() { + log.info("Dispatcher starting"); + this._mqttClient.exec(); + } +} +const dispatcher = new Dispatcher(); +dispatcher.exec(); +//# sourceMappingURL=main.js.map \ No newline at end of file diff --git a/dist/mqttclient.js b/dist/mqttclient.js new file mode 100644 index 0000000..5cdd46c --- /dev/null +++ b/dist/mqttclient.js @@ -0,0 +1,60 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const Mqtt = require("mqtt"); +const log = require("./log"); +const MQTT_BROKER_DEFAULT_URL = "mqtt://localhost"; +class MqttClient { + constructor(mqttBrokerUrl) { + this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL; + this._topicHandlers = []; + } + register(topic, callback) { + this._topicHandlers.push({ topic, callback }); + log.info(`handler registered for topic ${topic}`); + } + exec() { + this._mqttClient = Mqtt.connect(this._mqttBrokerUrl); + this._mqttClient.on('error', log.error); + this._mqttClient.on('connect', () => { + log.info("connected to mqtt broker"); + for (let topicHandler of this._topicHandlers) { + this._mqttClient.subscribe(topicHandler.topic); + } + }); + this._mqttClient.on('message', (topic, payload) => { + log.info(`message received, topic ${topic}, payload ${payload}`); + for (let topicHandler of this._topicHandlers) { + if (this.topicMatch(topicHandler.topic, topic)) { + log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`); + } + } + }); + } + topicMatch(registeredTopic, receivedTopic) { + let registeredTopicFields = registeredTopic.split('/'); + let receivedTopicFields = receivedTopic.split('/'); + for (let field in registeredTopicFields) { + let regField = registeredTopicFields[field]; + let recvField = receivedTopicFields[field]; + log.info(`recv: ${recvField}, reg: ${regField}`); + if (regField === "#") { + log.info('true'); + return true; + } + if (regField != recvField && regField != "+") { + log.info('false'); + return false; + } + } + if (registeredTopicFields.length == receivedTopicFields.length) { + log.info('true'); + return true; + } + else { + log.info('false'); + return false; + } + } +} +exports.MqttClient = MqttClient; +//# sourceMappingURL=mqttclient.js.map \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..a3d564d --- /dev/null +++ b/package.json @@ -0,0 +1,25 @@ +{ + "name": "dispatcher", + "version": "1.0.0", + "description": "", + "main": "dist/main.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1", + "build": "tsc -p ./", + "debug": "npm run build && node dist/main.js --loglevel debug", + "start": "node dist/main.js" + }, + "author": "Wolfgang Hottgenroth ", + "license": "ISC", + "devDependencies": { + "@types/chalk": "^0.4.31", + "@types/mqtt": "0.0.34", + "@types/node": "^8.0.14", + "typescript": "^2.4.2" + }, + "dependencies": { + "chalk": "^2.0.1", + "moment": "^2.18.1", + "mqtt": "^2.9.2" + } +} diff --git a/src/log.ts b/src/log.ts new file mode 100644 index 0000000..b0820d8 --- /dev/null +++ b/src/log.ts @@ -0,0 +1,50 @@ +import * as chalk from 'chalk' +import * as moment from 'moment' + +enum Level { + All, + NoDebug, + NoDebugNoInfo, + NoDebugNoInfoNoWarning +} + +var level = Level.NoDebug + +function timestamp(): string { + return moment().format('HH:mm:ss.SSS') +} + +export function setLevel(value: string): void { + switch (value) { + case 'info': level = Level.NoDebug; break + case 'warn': level = Level.NoDebugNoInfo; break + case 'error': level = Level.NoDebugNoInfoNoWarning; break + default: level = Level.All + } +} + +export function info(message: string): void { + if (level < Level.NoDebugNoInfo) { + console.log(`${timestamp()} ${chalk.bold.cyan('[ II ]')} ${message}`) + } +} + +export function warn(message: string): void { + if (level < Level.NoDebugNoInfoNoWarning) { + console.log(`${timestamp()} ${chalk.bold.yellow('[ WW ]')} ${message}`) + } +} + +export function error(message: string): void { + console.log(`${timestamp()} ${chalk.bold.red('[ EE ]')} ${message}`) +} + +export function success(message: string): void { + console.log(`${timestamp()} ${chalk.bold.green('[ OK ]')} ${message}`) +} + +export function debug(message: string): void { + if (level < Level.NoDebug) { + console.log(`${timestamp()} ${chalk.bold.magenta('[ DB ]')} ${message}`) + } +} diff --git a/src/main.ts b/src/main.ts new file mode 100644 index 0000000..9e6c882 --- /dev/null +++ b/src/main.ts @@ -0,0 +1,27 @@ +import * as log from './log' +import * as MqttClient from './mqttclient' + +class Dispatcher { + private _mqttClient: MqttClient.MqttClient + + constructor() { + this._mqttClient = new MqttClient.MqttClient() + + this._mqttClient.register('IoT/test', null) + this._mqttClient.register('IoT/Device/#', null) + } + + exec() : void { + log.info("Dispatcher starting") + + this._mqttClient.exec() + } +} + + +const dispatcher = new Dispatcher() +dispatcher.exec() + + + + diff --git a/src/mqttclient.ts b/src/mqttclient.ts new file mode 100644 index 0000000..4a2b006 --- /dev/null +++ b/src/mqttclient.ts @@ -0,0 +1,71 @@ +import * as Mqtt from 'mqtt' +import * as log from './log' + +const MQTT_BROKER_DEFAULT_URL : string = "mqtt://localhost" + +type TopicHandlerCallback = (message: Buffer) => void + +type TopicHandler = { + topic: string, + callback: TopicHandlerCallback|null +} +export class MqttClient { + private _mqttClient: Mqtt.Client + private _mqttBrokerUrl: string + private _topicHandlers: TopicHandler[] + + constructor(mqttBrokerUrl? : string) { + this._mqttBrokerUrl = (mqttBrokerUrl) ? mqttBrokerUrl : MQTT_BROKER_DEFAULT_URL + this._topicHandlers = [] + } + + register(topic: string, callback: TopicHandlerCallback|null) : void { + this._topicHandlers.push({topic, callback}) + log.info(`handler registered for topic ${topic}`) + } + + exec() : void { + this._mqttClient = Mqtt.connect(this._mqttBrokerUrl) + this._mqttClient.on('error', log.error) + this._mqttClient.on('connect', (): void => { + log.info("connected to mqtt broker") + for (let topicHandler of this._topicHandlers) { + this._mqttClient.subscribe(topicHandler.topic) + } + }) + this._mqttClient.on('message', (topic: string, payload: Buffer): void => { + log.info(`message received, topic ${topic}, payload ${payload}`) + for (let topicHandler of this._topicHandlers) { + if (this.topicMatch(topicHandler.topic, topic)) { + log.info(`received topic ${topic} matches registered topic ${topicHandler.topic}`) + } + } + }) + } + + private topicMatch(registeredTopic: string, receivedTopic: string) : boolean { + let registeredTopicFields = registeredTopic.split('/') + let receivedTopicFields = receivedTopic.split('/') + for (let field in registeredTopicFields) { + let regField = registeredTopicFields[field] + let recvField = receivedTopicFields[field] + log.info(`recv: ${recvField}, reg: ${regField}`) + if (regField === "#") { + log.info('true') + return true + } + if (regField != recvField && regField != "+") { + log.info('false') + return false + } + } + if (registeredTopicFields.length == receivedTopicFields.length) { + log.info('true') + return true + } else { + log.info('false') + return false + } + + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..ebd3e33 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,25 @@ +{ + "compilerOptions": { + "target": "es2015", + "module": "commonjs", + "moduleResolution": "node", + "sourceMap": true, + "lib": ["es2015"], + "strictNullChecks": true, + "noImplicitAny": true, + "noEmitOnError": true, + "outDir": "dist", + "typeRoots": [ + "node_modules/@types" + ] + }, + "include": [ + "src/**/*.*" + ], + "exclude": [ + "node_modules", + "dist", + "proto", + "kernel" + ] +}