diff --git a/src/MongoBackend.ts b/src/MongoBackend.ts new file mode 100644 index 0000000..35f7487 --- /dev/null +++ b/src/MongoBackend.ts @@ -0,0 +1,71 @@ +import * as Mongo from 'mongodb' +import { MqttMessage } from './mqtt_message' +import { Instance } from './instance' + + +export default class MongoBackend extends Instance { + constructor(options : any) { + super(options) + this.on('reconnectDatabase', this.connectToDatabase) + } + + private dbHandle : Mongo.Db; + private dbReady : boolean = false; + + connectToDatabase() { + this.loginfo("About to connect to database") + Mongo.MongoClient.connect(this.options.mongodbUrl) + .then( + (tmpDbHandle: Mongo.Db) => { + this.dbHandle = tmpDbHandle + this.dbReady = true; + this.loginfo(`Database ${this.options.mongodbUrl} connected`) + this.dbHandle.on('reconnectFailed', (err : any) => { this.logwarn(`Error on database ${err}`) }) + this.dbHandle.on('reconnect', () => { + this.loginfo("Reconnect on database") + this.dbReady = true + }) + this.dbHandle.on('timeout', () => { this.logwarn("Timeout on database") }) + this.dbHandle.on('close', () => { + this.loginfo("Close on database") + this.dbReady = false + }) + }, + (err: String) => { + this.logerror(`Unable to connect to database: ${err}`) + this.dbReady = false + } + ) + } + + connectToQueue() { + this.on('data', (msg : MqttMessage) => { + if (this.dbReady) { + let coll = this.dbHandle.collection(this.options.collection) + coll.insertOne(msg.getMessage()) + .then( + (r) => { + if (this.options.verbose) { + this.loginfo(`Successfully inserted into database ${this.options.mongodbUrl}, ${this.options.collection}: ${JSON.stringify(msg.getMessage())}`) + } + }, + (err) => { + this.dbReady = false + this.logerror(`Error when trying to insert into database ${err}`) + this.emit('reconnectDatabase') + } + ) + } + }) + } + +/* + } + } else { + // this.loginfo("Database currently not available, not reading from stream") + } + }) + + } + */ +} diff --git a/src/MqttMongo.ts b/src/MqttMongo.ts deleted file mode 100644 index b8cdfbc..0000000 --- a/src/MqttMongo.ts +++ /dev/null @@ -1,147 +0,0 @@ -import * as Mqtt from 'mqtt' -import * as Mongo from 'mongodb' -import * as Events from 'events' -import * as Queue from './queue' -import * as MqttMessage from './mqtt_message' -import * as logger from './log' - - -export default class MqttMongo extends Events.EventEmitter { - - private options : any - private msgCnt : number = 0 - private startTime: Date - - constructor(options : any) { - super() - this.options = options - this.startTime = new Date() - this.on('reconnectDatabase', this.connectToDatabase) - } - - private dbHandle : Mongo.Db; - private dbReady : boolean = false; - private queue : Queue.Queue = new Queue.Queue() - private mqttClient : Mqtt.Client - private heartbeatTimer : NodeJS.Timer - - logerror(msg: string) { - logger.error(`${this.options.instanceId}: ${msg}`) - } - - logwarn(msg: string) { - logger.warn(`${this.options.instanceId}: ${msg}`) - } - - loginfo(msg: string) { - logger.info(`${this.options.instanceId}: ${msg}`) - } - -connectToDatabase() { - this.loginfo("About to connect to database") - Mongo.MongoClient.connect(this.options.mongodbUrl) - .then( - (tmpDbHandle: Mongo.Db) => { - this.dbHandle = tmpDbHandle - this.dbReady = true; - this.loginfo(`Database ${this.options.mongodbUrl} connected`) - this.dbHandle.on('reconnectFailed', (err : any) => { this.logwarn(`Error on database ${err}`) }) - this.dbHandle.on('reconnect', () => { - this.loginfo("Reconnect on database") - this.dbReady = true - this.queue.knock() - }) - this.dbHandle.on('timeout', () => { this.logwarn("Timeout on database") }) - this.dbHandle.on('close', () => { - this.loginfo("Close on database") - this.dbReady = false - }) - }, - (err: String) => { - this.logerror(`Unable to connect to database: ${err}`) - this.dbReady = false - } - ) - } - - connectToBroker() { - this.queue.on('data', () => { - if (this.dbReady || true) { - while (! this.queue.isEmpty()) { - let msg : MqttMessage.MqttMessage = this.queue.deq() - if (this.options.verbose) { - this.loginfo(`Something in the queue`) - } - let coll = this.dbHandle.collection(this.options.collection) - coll.insertOne(msg.getMessage()) - .then( - (r) => { - if (this.options.verbose) { - this.loginfo(`Successfully inserted into database ${this.options.mongodbUrl}, ${this.options.collection}: ${JSON.stringify(msg.getMessage())}`) - } - }, - (err) => { - this.logerror(`Error when trying to insert into database ${err}`) - if (! this.dbReady) { - this.loginfo("Error occured while database connection is lost, re-enqueue msg.") - this.queue.reenq(msg) - this.emit('reconnectDatabase') - } else { - this.logerror(`Message ${JSON.stringify(msg.getMessage())} is lost`) - } - } - ) - } - } else { - // this.loginfo("Database currently not available, not reading from stream") - } - }) - - this.mqttClient = Mqtt.connect(this.options.broker) - this.mqttClient.on('offline', () => { console.warn("MQTT client is offline") }) - this.mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") }) - this.mqttClient.on('close', () => { console.warn("MQTT connection closed") }) - - this.mqttClient.on('connect', () => { - this.loginfo("MQTT broker connected") - this.options.topics.forEach((topic: string) => { - this.mqttClient.subscribe(topic) - this.loginfo(`Subscribed to ${topic}`) - }) - this.mqttClient.publish(`MqttMongo/Status/${this.options.instanceId}`, 'hello, started up') - }) - - this.mqttClient.on('message', (topic : string, messageBuf : Buffer) => { - this.msgCnt++; - let message = messageBuf.toString('UTF-8') - if (this.options.verbose) { - this.loginfo(`Message received ${this.msgCnt}, topic ${topic}, payload ${message}`) - } - - try { - this.queue.enq(new MqttMessage.MqttMessage(topic, message, this.options.encapsulate, this.options.parsePayload)) - } catch (e) { - this.logerror(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`) - } - }) - } - - setupHeartbeat() { - this.heartbeatTimer = setInterval(() => { - let uptime : number = (new Date().getTime() - this.startTime.getTime()) / 1000 - let statusMsg = `{'Uptime': ${uptime}, 'MessageCount': ${this.msgCnt}}` - this.mqttClient.publish(`MqttMongo/Status/${this.options.instanceId}`, statusMsg) - this.loginfo(`Status: ${statusMsg}`) - if (! this.dbReady) { - this.emit("reconnectDatabase") - } - }, 60000) - this.loginfo("Heartbeat timer started") - } - - exec() : void { - this.connectToDatabase() - this.connectToBroker() - this.setupHeartbeat() - } -} diff --git a/src/instance.ts b/src/instance.ts new file mode 100644 index 0000000..01c69e8 --- /dev/null +++ b/src/instance.ts @@ -0,0 +1,35 @@ +import * as Events from 'events' +import * as logger from './log' + + +export abstract class Instance extends Events.EventEmitter { + + protected options : any + + constructor(options : any) { + super() + this.options = options + } + + + logerror(msg: string) { + logger.error(`${this.options.instanceId}: ${msg}`) + } + + logwarn(msg: string) { + logger.warn(`${this.options.instanceId}: ${msg}`) + } + + loginfo(msg: string) { + logger.info(`${this.options.instanceId}: ${msg}`) + } + + abstract connectToDatabase() : void + + abstract connectToQueue() : void + + exec() : void { + this.connectToDatabase() + this.connectToQueue() + } +} diff --git a/src/main.ts b/src/main.ts index a2cb574..34b2bfe 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,6 +1,6 @@ import * as log from './log' import * as config from './config' -import MqttMongo from './MqttMongo' +import { MongoBackend } from './MongoBackend' diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 286bb02..0000000 --- a/src/queue.ts +++ /dev/null @@ -1,45 +0,0 @@ -import * as Events from 'events' - - -export class Queue extends Events.EventEmitter { - private q : T[] = [] - - constructor() { - super() - } - - isEmpty() : boolean { - return this.q.length == 0 - } - - knock() { - this.emit('data') - } - - enq(x : T) { - this.q.push(x) - this.emit('data') - } - - reenq(x : T) { - this.q.unshift(x) - // this.emit('data') - } - - deq() : T { - let x : T = this.peek() - this.q.shift() - - return x - } - - peek() : T { - if (this.isEmpty()) { - throw new Error("queue is empty") - } - - let x : T = this.q[0] - - return x - } -}