instancify

This commit is contained in:
2018-05-14 14:48:43 +02:00
parent a361a1c50e
commit d14b83f3ef
7 changed files with 467 additions and 290 deletions

147
src/MqttMongo.ts Normal file
View File

@ -0,0 +1,147 @@
import * as Mqtt from 'mqtt'
import * as Mongo from 'mongodb'
import * as Events from 'events'
import * as Queue from './queue'
import * as MqttMessage from './mqtt_message'
import * as logger from './log'
export default class MqttMongo extends Events.EventEmitter {
private options : any
private msgCnt : number = 0
private startTime: Date
constructor(options : any) {
super()
this.options = options
this.startTime = new Date()
this.on('reconnectDatabase', this.connectToDatabase)
}
private dbHandle : Mongo.Db;
private dbReady : boolean = false;
private queue : Queue.Queue<MqttMessage.MqttMessage> = new Queue.Queue<MqttMessage.MqttMessage>()
private mqttClient : Mqtt.Client
private heartbeatTimer : NodeJS.Timer
logerror(msg: string) {
logger.error(`${this.options.instanceId}: ${msg}`)
}
logwarn(msg: string) {
logger.warn(`${this.options.instanceId}: ${msg}`)
}
loginfo(msg: string) {
logger.info(`${this.options.instanceId}: ${msg}`)
}
connectToDatabase() {
this.loginfo("About to connect to database")
Mongo.MongoClient.connect(this.options.database)
.then(
(tmpDbHandle: Mongo.Db) => {
this.dbHandle = tmpDbHandle
this.dbReady = true;
this.loginfo("Database connected")
this.dbHandle.on('reconnectFailed', (err : any) => { this.logwarn(`Error on database ${err}`) })
this.dbHandle.on('reconnect', () => {
this.loginfo("Reconnect on database")
this.dbReady = true
this.queue.knock()
})
this.dbHandle.on('timeout', () => { this.logwarn("Timeout on database") })
this.dbHandle.on('close', () => {
this.loginfo("Close on database")
this.dbReady = false
})
},
(err: String) => {
this.logerror(`Unable to connect to database: ${err}`)
this.dbReady = false
}
)
}
connectToBroker() {
this.queue.on('data', () => {
if (this.dbReady || true) {
while (! this.queue.isEmpty()) {
let msg : MqttMessage.MqttMessage = this.queue.deq()
if (this.options.verbose) {
this.loginfo(`Something in the queue: ${JSON.stringify(msg)}`)
}
let coll = this.dbHandle.collection(this.options.collection)
coll.insertOne(msg.getMessage())
.then(
(r) => {
if (this.options.verbose) {
this.loginfo(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`)
}
},
(err) => {
this.logerror(`Error when trying to insert into database ${err}`)
if (! this.dbReady) {
this.loginfo("Error occured while database connection is lost, re-enqueue msg.")
this.queue.reenq(msg)
this.emit('reconnectDatabase')
} else {
this.logerror(`Message ${JSON.stringify(msg.getMessage())} is lost`)
}
}
)
}
} else {
// this.loginfo("Database currently not available, not reading from stream")
}
})
this.mqttClient = Mqtt.connect(this.options.broker)
this.mqttClient.on('offline', () => { console.warn("MQTT client is offline") })
this.mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") })
this.mqttClient.on('close', () => { console.warn("MQTT connection closed") })
this.mqttClient.on('connect', () => {
this.loginfo("MQTT broker connected")
this.options.topics.forEach((topic: string) => {
this.mqttClient.subscribe(topic)
this.loginfo(`Subscribed to ${topic}`)
})
this.mqttClient.publish(`MqttMongo/Status/${this.options.instanceId}`, 'hello, started up')
})
this.mqttClient.on('message', (topic : string, messageBuf : Buffer) => {
this.msgCnt++;
let message = messageBuf.toString('UTF-8')
if (this.options.verbose) {
this.loginfo(`Message received ${this.msgCnt}, topic ${topic}, payload ${message}`)
}
try {
this.queue.enq(new MqttMessage.MqttMessage(topic, message, this.options.encapsulate, this.options.parsePayload))
} catch (e) {
this.logerror(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`)
}
})
}
setupHeartbeat() {
this.heartbeatTimer = setInterval(() => {
let uptime : number = (new Date().getTime() - this.startTime.getTime()) / 1000
let statusMsg = `{'Uptime': ${uptime}, 'MessageCount': ${this.msgCnt}}`
this.mqttClient.publish(`MqttMongo/Status/${this.options.instanceId}`, statusMsg)
this.loginfo(`Status: ${statusMsg}`)
if (! this.dbReady) {
this.emit("reconnectDatabase")
}
}, 60000)
this.loginfo("Heartbeat timer started")
}
exec() : void {
this.connectToDatabase()
this.connectToBroker()
this.setupHeartbeat()
}
}

19
src/config.ts Normal file
View File

@ -0,0 +1,19 @@
import * as fs from 'fs'
import * as cmdargs from 'command-line-args'
const OPTION_DEFINITIONS = [
{ name: 'verbose', alias: 'v', type: Boolean },
{ name: 'config', alias: 'c', type: String, defaultValue: '~/MqttMongoNodejs.conf' }
];
export let dict : any
export function readConfig() {
let options = cmdargs(OPTION_DEFINITIONS)
dict = JSON.parse(fs.readFileSync(options.config, "utf8"))
}
readConfig()

52
src/log.ts Normal file
View File

@ -0,0 +1,52 @@
import * as moment from 'moment'
import * as config from './config'
enum Level {
All,
NoDebug,
NoDebugNoInfo,
NoDebugNoInfoNoWarning
}
var level = Level.NoDebug
function timestamp(): string {
return moment().format('HH:mm:ss.SSS')
}
export function setLevel(value: string): void {
switch (value) {
case 'info': level = Level.NoDebug; break
case 'warn': level = Level.NoDebugNoInfo; break
case 'error': level = Level.NoDebugNoInfoNoWarning; break
default: level = Level.All
}
}
export function info(message: string): void {
if (level < Level.NoDebugNoInfo) {
console.log(`${timestamp()} [ II ] ${message}`)
}
}
export function warn(message: string): void {
if (level < Level.NoDebugNoInfoNoWarning) {
console.log(`${timestamp()} [ WW ] ${message}`)
}
}
export function error(message: string): void {
console.log(`${timestamp()} [ EE ] ${message}`)
}
export function success(message: string): void {
console.log(`${timestamp()} [ OK ] ${message}`)
}
export function debug(message: string): void {
if (level < Level.NoDebug) {
console.log(`${timestamp()} [ DB ] ${message}`)
}
}

View File

@ -1,173 +1,32 @@
import * as Mqtt from 'mqtt'
import * as Mongo from 'mongodb'
import * as Events from 'events'
import * as Queue from './queue'
import * as MqttMessage from './mqtt_message'
import * as log from './log'
import * as config from './config'
import MqttMongo from './MqttMongo'
let console = require('chalk-console')
let instances : MqttMongo[] = []
const MQTT_BROKER_URL : String = 'mqtt://localhost'
const MONGO_DATABASE_URL : String = 'mongodb://localhost/test'
const COLLECTION : String = 'mqttMongo'
function collect(val: string, memo: string[]) {
memo.push(val);
return memo;
}
import options = require('commander')
options
.version('0.0.1')
.option('-b, --broker [broker url]', 'Broker URL', MQTT_BROKER_URL)
.option('-m, --database [database url]', 'MongoDB Database URL', MONGO_DATABASE_URL)
.option('-c, --collection [mongodb collection]', 'Collection in MongoDB Database', COLLECTION)
.option('-t, --topic [topic to subscribe]', 'Topic to subscribe, can appear multiple times', collect, [])
.option('-e, --encapsulate', 'store current timestamp, topic and payload in document', false)
.option('-p, --parsePayload', 'parse payload when encapsulating (otherwise always)', false)
.option('-v, --verbose', 'log all inserted messages', false)
.parse(process.argv)
class MqttMongo extends Events.EventEmitter {
private options : any
private msgCnt : number = 0
private startTime: Date
constructor(options : any) {
super()
this.options = options
this.startTime = new Date()
this.on('reconnectDatabase', this.connectToDatabase)
config.dict.instances.forEach((v: any) => {
let options : any = {
brokerUrl: config.dict.brokerUrl,
brokerUser: config.dict.brokerUser,
brokerPass: config.dict.brokerPass,
brokerCa: config.dict.brokerCa,
mongodbUrl: config.dict.mongodbUrl,
verbose: config.dict.verbose,
instanceId: v.instanceId,
collection: v.collection,
topics: v.topics,
encapsulate : v.encapsulate,
parsePayload : v.parsePayload
}
private dbHandle : Mongo.Db;
private dbReady : boolean = false;
private queue : Queue.Queue<MqttMessage.MqttMessage> = new Queue.Queue<MqttMessage.MqttMessage>()
private mqttClient : Mqtt.Client
private heartbeatTimer : NodeJS.Timer
log.info(JSON.stringify(options))
connectToDatabase() {
console.info("About to connect to database")
Mongo.MongoClient.connect(this.options.database)
.then(
(tmpDbHandle: Mongo.Db) => {
this.dbHandle = tmpDbHandle
this.dbReady = true;
console.info("Database connected")
this.dbHandle.on('reconnectFailed', (err : any) => { console.warn(`Error on database ${err}`) })
this.dbHandle.on('reconnect', () => {
console.info("Reconnect on database")
this.dbReady = true
this.queue.knock()
})
this.dbHandle.on('timeout', () => { console.warn("Timeout on database") })
this.dbHandle.on('close', () => {
console.info("Close on database")
this.dbReady = false
})
},
(err: String) => {
console.error(`Unable to connect to database: ${err}`)
this.dbReady = false
}
)
}
let instance : MqttMongo = new MqttMongo(options)
instance.exec()
instances.push(instance)
})
connectToBroker() {
this.queue.on('data', () => {
if (this.dbReady || true) {
while (! this.queue.isEmpty()) {
let msg : MqttMessage.MqttMessage = this.queue.deq()
if (this.options.verbose) {
console.info(`Something in the queue: ${JSON.stringify(msg)}`)
}
let coll = this.dbHandle.collection(this.options.collection)
coll.insertOne(msg.getMessage())
.then(
(r) => {
if (this.options.verbose) {
console.success(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`)
}
},
(err) => {
console.error(`Error when trying to insert into database ${err}`)
if (! this.dbReady) {
console.info("Error occured while database connection is lost, re-enqueue msg.")
this.queue.reenq(msg)
this.emit('reconnectDatabase')
} else {
console.error(`Message ${JSON.stringify(msg.getMessage())} is lost`)
}
}
)
}
} else {
// console.info("Database currently not available, not reading from stream")
}
})
this.mqttClient = Mqtt.connect(this.options.broker)
this.mqttClient.on('offline', () => { console.warn("MQTT client is offline") })
this.mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") })
this.mqttClient.on('close', () => { console.warn("MQTT connection closed") })
this.mqttClient.on('connect', () => {
console.info("MQTT broker connected")
this.options.topic.forEach((topic: string) => {
this.mqttClient.subscribe(topic)
console.info(`Subscribed to ${topic}`)
})
this.mqttClient.subscribe('MqttMongo/Command')
this.mqttClient.publish('MqttMongo/Status', 'hello, started up')
})
this.mqttClient.on('message', (topic : string, messageBuf : Buffer) => {
this.msgCnt++;
let message = messageBuf.toString('UTF-8')
if (this.options.verbose) {
console.info(`Message received ${this.msgCnt}, topic ${topic}, payload ${message}`)
}
if (topic == "MqttMongo/Command" && message == "shutdown") {
this.shutdown()
} else {
try {
this.queue.enq(new MqttMessage.MqttMessage(topic, message, this.options.encapsulate, this.options.parsePayload))
} catch (e) {
console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`)
}
}
})
}
setupHeartbeat() {
this.heartbeatTimer = setInterval(() => {
let uptime : number = (new Date().getTime() - this.startTime.getTime()) / 1000
let statusMsg = `{'Uptime': ${uptime}, 'MessageCount': ${this.msgCnt}}`
this.mqttClient.publish('MqttMongo/Status', statusMsg)
console.info(`Status: ${statusMsg}`)
if (! this.dbReady) {
this.emit("reconnectDatabase")
}
}, 60000)
console.info("Heartbeat timer started")
}
shutdown() {
console.info("Shutting down MqttMongo")
clearInterval(this.heartbeatTimer)
this.mqttClient.end()
this.dbHandle.close()
}
}
let mqttMongo : MqttMongo = new MqttMongo(options)
mqttMongo.connectToDatabase()
mqttMongo.connectToBroker()
mqttMongo.setupHeartbeat()
console.info("MqttMongo started")
log.info("MqttMongoNodejs started")