diff --git a/src/main.ts b/src/main.ts index ef82f58..64bba43 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,5 +1,6 @@ import * as Mqtt from 'mqtt' import * as Mongo from 'mongodb' +import * as Events from 'events' import * as Queue from './queue' import * as MqttMessage from './mqtt_message' @@ -22,112 +23,139 @@ options .parse(process.argv) +class MqttMongo extends Events.EventEmitter { -var dbHandle : Mongo.Db; -var dbReady : boolean = false; + private options : any -Mongo.MongoClient.connect(options['database'], -{ - 'server': { - 'reconnectTries': 5, - 'reconnectInterval': 1000, - 'socketOptions': { - 'autoReconnect': false - } - } -}) - .then( - (tmpDbHandle: Mongo.Db) => { - dbHandle = tmpDbHandle - dbReady = true; - console.info("Connected to database") - dbHandle.on('reconnectFailed', (err : any) => { console.warn(`Error on database ${err}`) }) - dbHandle.on('reconnect', () => { - console.info("Reconnect on database") - dbReady = true - queue.knock() - }) - dbHandle.on('timeout', () => { console.warn("Timeout on database") }) - dbHandle.on('close', () => { - console.info("Close on database") - dbReady = false - }) - }, - (err: String) => { - console.error("Unable to connect to database: %s", err) - process.exit(1) - } -) - - -var queue = new Queue.Queue() -queue.on('data', () => { - if (dbReady) { - while (! queue.isEmpty()) { - var msg : MqttMessage.MqttMessage = queue.deq() - console.info(`Something in the queue: ${JSON.stringify(msg)}`) - var coll = dbHandle.collection(options['collection']) - coll.insertOne(msg.getMessage()) - .then( - (r) => { - console.success(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`) - }, - (err) => { - console.error(`Error when trying to insert into database ${err}`) - if (! dbReady) { - console.info("Error occured while database connection is lost, re-enqueue msg.") - queue.reenq(msg) - } else { - console.error(`Message ${JSON.stringify(msg.getMessage())} is lost`) - } - } - ) - } - } else { - // console.info("Database currently not available, not reading from stream") + constructor(options : any) { + super() + this.options = options + this.on('reconnectDatabase', this.connectToDatabase) } -}) + private dbHandle : Mongo.Db; + private dbReady : boolean = false; + private queue : Queue.Queue = new Queue.Queue() + private mqttClient : Mqtt.Client + private heartbeatTimer : NodeJS.Timer + private uptime : number = 0 -var mqttClient = Mqtt.connect(options['broker']) -mqttClient.on('offline', () => { console.warn("MQTT client is offline") }) -mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") }) -mqttClient.on('close', () => { console.warn("MQTT connection closed") }) + connectToDatabase() { + console.info("About to connect to database") + Mongo.MongoClient.connect(this.options['database'], + { +// 'server': { +// 'reconnectTries': 5, +// 'reconnectInterval': 1000, +// 'socketOptions': { +// 'autoReconnect': false +// } +// } + }) + .then( + (tmpDbHandle: Mongo.Db) => { + this.dbHandle = tmpDbHandle + this.dbReady = true; + console.info("Connected to database") + this.dbHandle.on('reconnectFailed', (err : any) => { console.warn(`Error on database ${err}`) }) + this.dbHandle.on('reconnect', () => { + console.info("Reconnect on database") + this.dbReady = true + this.queue.knock() + }) + this.dbHandle.on('timeout', () => { console.warn("Timeout on database") }) + this.dbHandle.on('close', () => { + console.info("Close on database") + this.dbReady = false + }) + }, + (err: String) => { + console.error(`Unable to connect to database: ${err}`) + this.dbReady = false + } + ) + } -mqttClient.on('connect', () => { - console.info("Connected to MQTT broker") - mqttClient.subscribe(options['topic']) - mqttClient.subscribe('MqttMongo/Command') - mqttClient.publish('MqttMongo/Status', 'hello, started up') -}) + connectToBroker() { + this.queue.on('data', () => { + if (this.dbReady || true) { + while (! this.queue.isEmpty()) { + var msg : MqttMessage.MqttMessage = this.queue.deq() + console.info(`Something in the queue: ${JSON.stringify(msg)}`) + var coll = this.dbHandle.collection(options['collection']) + coll.insertOne(msg.getMessage()) + .then( + (r) => { + console.success(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`) + }, + (err) => { + console.error(`Error when trying to insert into database ${err}`) + if (! this.dbReady) { + console.info("Error occured while database connection is lost, re-enqueue msg.") + this.queue.reenq(msg) + this.emit('reconnectDatabase') + } else { + console.error(`Message ${JSON.stringify(msg.getMessage())} is lost`) + } + } + ) + } + } else { + // console.info("Database currently not available, not reading from stream") + } + }) + + this.mqttClient = Mqtt.connect(options['broker']) + this.mqttClient.on('offline', () => { console.warn("MQTT client is offline") }) + this.mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") }) + this.mqttClient.on('close', () => { console.warn("MQTT connection closed") }) -var msgCnt : number = 0 -mqttClient.on('message', (topic : string, message : string) => { - msgCnt++; - console.info(`Message received ${msgCnt}, topic ${topic}, payload ${message}`) + this.mqttClient.on('connect', () => { + console.info("Connected to MQTT broker") + this.mqttClient.subscribe(options['topic']) + this.mqttClient.subscribe('MqttMongo/Command') + this.mqttClient.publish('MqttMongo/Status', 'hello, started up') + }) - if (topic == "MqttMongo/Command" && message == "shutdown") { + var msgCnt : number = 0 + this.mqttClient.on('message', (topic : string, message : string) => { + msgCnt++; + console.info(`Message received ${msgCnt}, topic ${topic}, payload ${message}`) + + if (topic == "MqttMongo/Command" && message == "shutdown") { + this.shutdown() + } else { + try { + var mqttMessage = new MqttMessage.MqttMessage(topic, message) + this.queue.enq(mqttMessage) + } catch (e) { + console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`) + } + } + }) + } + + setupHeartbeat() { + this.heartbeatTimer = setInterval(() => { + this.uptime++ + this.mqttClient.publish('MqttMongo/Status', `{'Uptime': ${this.uptime}}`) + if (! this.dbReady) { + this.emit("reconnectDatabase") + } + }, 1000) + } + + shutdown() { console.info("Shutting down MqttMongo") - clearInterval(uptimeInterval) - mqttClient.end() - dbHandle.close() - } else { - try { - var mqttMessage = new MqttMessage.MqttMessage(topic, message) - queue.enq(mqttMessage) - } catch (e) { - console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`) - } + clearInterval(this.heartbeatTimer) + this.mqttClient.end() + this.dbHandle.close() } -}) - - -var uptime : number = 0 -var uptimeInterval = setInterval(() => { - uptime++ - mqttClient.publish('MqttMongo/Status', `{'Uptime': ${uptime}}`) -}, 1000) - +} +var mqttMongo : MqttMongo = new MqttMongo(options) +mqttMongo.connectToDatabase() +mqttMongo.connectToBroker() +mqttMongo.setupHeartbeat() console.info("MqttMongo started") \ No newline at end of file