typescriptifying

This commit is contained in:
Wolfgang Hottgenroth
2018-01-08 21:54:59 +01:00
parent c1fa639ae4
commit 9e39d74084
12 changed files with 607 additions and 105 deletions

107
src/MqttDispatcher.ts Normal file
View File

@ -0,0 +1,107 @@
import * as logger from './log'
import * as Mqtt from 'mqtt'
import * as fs from 'fs'
import * as config from './config'
export type TopicCallbackFunc = (topic: string, payload: string) => void
export interface TopicHandler {
topic: string,
callback: TopicCallbackFunc
}
class MqttHandler {
private mqttClient: Mqtt.Client
private mqttOptions: Mqtt.IClientOptions = {}
private mqttBrokerUrl: string
private topicHandlers: TopicHandler[]
constructor() {
this.mqttBrokerUrl = config.dict.brokerUrl
if (config.dict.brokerUser && config.dict.brokerPass) {
this.mqttOptions.username = config.dict.brokerUser
this.mqttOptions.password = config.dict.brokerPass
}
if (config.dict.brokerCa) {
this.mqttOptions.ca = fs.readFileSync(config.dict.brokerCa, 'ascii')
this.mqttOptions.rejectUnauthorized = true
}
this.topicHandlers = []
}
register(topics: string[], cb: TopicCallbackFunc) : void {
topics.forEach((topic) => {
this.topicHandlers.push({topic: topic, callback: cb})
logger.info(`additional callback registered for ${topic}`)
})
}
exec() : void {
logger.info(`connecting to ${this.mqttBrokerUrl}`)
this.mqttClient = Mqtt.connect(this.mqttBrokerUrl, this.mqttOptions)
this.mqttClient.on('error', (err) => {
logger.error(`Error in mqttHandler: ${err}`)
})
this.mqttClient.on('connect', () : void => {
this.mqttClient.publish('dispatcher_ng/status', 'dispatcher_ng running')
this.mqttClient.subscribe('dispatcher_ng/cmd')
this.topicHandlers.forEach((topicHandler) => {
this.mqttClient.subscribe(topicHandler.topic)
logger.info(`${topicHandler.topic} subscribed`)
})
logger.info('mqtt connection established')
})
this.mqttClient.on('message', (topic: string, payload: Buffer, packet : Mqtt.IPublishPacket): void => {
if (! packet.retain) {
let payloadStr : string = payload.toString('UTF-8')
logger.info(`message received on topic ${topic}: ${payload}`)
this.processMessage(topic, payloadStr)
}
})
}
processMessage(topic: string, payload: string) : boolean {
let found = false;
this.topicHandlers.forEach((topicHandler) => {
// logger.warn(`Test: ${subscribedTopic}, ${topic}`);
// console.log(`Test: ${subscribedTopic}, ${topic}`);
if (topicHandler.topic == topic) {
// logger.warn('1');
topicHandler.callback(topic, payload)
found = true
} else if (topicHandler.topic.endsWith('#') &&
(topicHandler.topic.substring(0, topicHandler.topic.length-1) ==
topic.substring(0, topicHandler.topic.length-1))) {
// logger.warn('2');
// console.log('2');
topicHandler.callback(topic, payload)
found = true
}
})
return found
}
send(topic: string, payload: string, internalFirst: boolean = false) : void {
let sent = false
if (internalFirst) {
logger.info(`Try internal sending: ${topic}`)
sent = this.processMessage(topic, payload)
}
if (! sent) {
logger.info(`External sending required: ${topic}`)
this.mqttClient.publish(topic, payload)
} else {
logger.info(`Internally delivered: ${topic}`)
}
}
}
export let mqttHandler = new MqttHandler()

19
src/config.ts Normal file
View File

@ -0,0 +1,19 @@
import * as fs from 'fs'
import * as cmdargs from 'command-line-args'
import * as logger from './log'
const OPTION_DEFINITIONS = [
{ name: 'verbose', alias: 'v', type: Boolean },
{ name: 'config', alias: 'c', type: String, defaultValue: '~/dispatcher_ng.conf' }
];
export let dict : any
export function readConfig() {
let options = cmdargs(OPTION_DEFINITIONS)
dict = JSON.parse(fs.readFileSync(options.config, "utf8"))
logger.info(JSON.stringify(dict))
}

79
src/log.ts Normal file
View File

@ -0,0 +1,79 @@
import * as moment from 'moment'
import * as config from './config'
import * as nodemailer from 'nodemailer'
enum Level {
All,
NoDebug,
NoDebugNoInfo,
NoDebugNoInfoNoWarning
}
var level = Level.NoDebug
function timestamp(): string {
return moment().format('HH:mm:ss.SSS')
}
export function setLevel(value: string): void {
switch (value) {
case 'info': level = Level.NoDebug; break
case 'warn': level = Level.NoDebugNoInfo; break
case 'error': level = Level.NoDebugNoInfoNoWarning; break
default: level = Level.All
}
}
export function sendAlarmMail(subject : string, message : string): void {
let transport = nodemailer.createTransport({
host: config.dict.smtpHost,
port: config.dict.smtpPort,
secure: false,
tls: {
rejectUnauthorized: false
}
});
let mail : nodemailer.SendMailOptions = {
from: config.dict.smtpSender,
to: config.dict.smtpReceiver,
subject: subject,
text: message
};
transport.sendMail(mail)
.then((v : nodemailer.SentMessageInfo) => {
info(`Mail sent, ${subject}, ${message}, ${v.response}`)
})
.catch((reason : any) => {
error(`Failure when sending alarm mail: ${message}, ${reason}`)
})
}
export function info(message: string): void {
if (level < Level.NoDebugNoInfo) {
console.log(`${timestamp()} [ II ] ${message}`)
}
}
export function warn(message: string): void {
if (level < Level.NoDebugNoInfoNoWarning) {
console.log(`${timestamp()} [ WW ] ${message}`)
}
}
export function error(message: string): void {
console.log(`${timestamp()} [ EE ] ${message}`)
}
export function success(message: string): void {
console.log(`${timestamp()} [ OK ] ${message}`)
}
export function debug(message: string): void {
if (level < Level.NoDebug) {
console.log(`${timestamp()} [ DB ] ${message}`)
}
}

View File

@ -1,14 +1,28 @@
class Test {
import * as logger from './log'
import * as config from './config'
// import { mqttHandler } from './MqttDispatcher'
config.readConfig()
class Dispatcher {
constructor() {
console.log("Test constructed")
logger.info("Dispatcher starting")
}
exec() : void {
console.log("Hello world")
logger.info("Hello world")
// mqttHandler.exec()
}
}
const test = new Test()
test.exec()
const dispatcher = new Dispatcher()
dispatcher.exec()

View File

@ -1,91 +0,0 @@
let logger = require('./log')
logger.info('mqttHandler executed')
var mqtt = require('mqtt');
var client = undefined;
var topicCallbacks = {};
function start() {
client = mqtt.connect('mqtt://172.16.2.16');
client.on('error', (err) => {
logger.error(`Error in mqttHandler: ${err}`)
});
client.on('connect', () => {
client.publish('dispatcher_ng/status', 'dispatcher_ng running');
client.subscribe('dispatcher_ng/cmd');
Object.keys(topicCallbacks).forEach((topic) => {
client.subscribe(topic);
logger.info(`${topic} subscribed`);
});
logger.info('mqtt connection established');
});
client.on('message', (topic, payload, packet) => {
if (! packet.retain) {
payload = payload.toString('UTF-8');
logger.info(`message received on topic ${topic}: ${payload}`);
processMessage(topic, payload);
}
});
}
function processMessage(topic, payload) {
let found = false;
Object.keys(topicCallbacks).forEach((subscribedTopic) => {
// logger.warn(`Test: ${subscribedTopic}, ${topic}`);
// console.log(`Test: ${subscribedTopic}, ${topic}`);
if (subscribedTopic == topic) {
// logger.warn('1');
topicCallbacks[topic].forEach((cb) => { cb(topic, payload) });
found = true;
} else if (subscribedTopic.endsWith('#') &&
(subscribedTopic.substring(0, subscribedTopic.length-1) ==
topic.substring(0, subscribedTopic.length-1))) {
// logger.warn('2');
// console.log('2');
topicCallbacks[subscribedTopic].forEach((cb) => { cb(topic, payload) });
found = false;
}
});
return found;
}
function send(topic, payload, internalFirst = false) {
let sent = false;
if (internalFirst) {
logger.info(`Try internal sending: ${topic}`);
sent = processMessage(topic, payload);
}
if (! sent) {
logger.info(`External sending required: ${topic}`);
client.publish(topic, payload);
} else {
logger.info(`Internally delivered: ${topic}`);
}
}
function register(topics, cb) {
if (! (topics instanceof Array)) {
topics = [ topics ];
}
topics.forEach((topic) => {
if (topic in topicCallbacks) {
topicCallbacks[topic].push(cb);
logger.info(`additional callback registered for ${topic}`);
} else {
topicCallbacks[topic] = [ cb ];
logger.info(`first callback registered for ${topic}`);
}
})
}
module.exports = {
start,
send,
register
};