automatic reconnect to database

This commit is contained in:
Wolfgang Hottgenroth
2017-05-05 17:16:11 +02:00
parent 724f60354c
commit 19ea878a59

View File

@ -1,5 +1,6 @@
import * as Mqtt from 'mqtt' import * as Mqtt from 'mqtt'
import * as Mongo from 'mongodb' import * as Mongo from 'mongodb'
import * as Events from 'events'
import * as Queue from './queue' import * as Queue from './queue'
import * as MqttMessage from './mqtt_message' import * as MqttMessage from './mqtt_message'
@ -22,112 +23,139 @@ options
.parse(process.argv) .parse(process.argv)
class MqttMongo extends Events.EventEmitter {
var dbHandle : Mongo.Db; private options : any
var dbReady : boolean = false;
Mongo.MongoClient.connect(options['database'], constructor(options : any) {
{ super()
'server': { this.options = options
'reconnectTries': 5, this.on('reconnectDatabase', this.connectToDatabase)
'reconnectInterval': 1000,
'socketOptions': {
'autoReconnect': false
}
}
})
.then(
(tmpDbHandle: Mongo.Db) => {
dbHandle = tmpDbHandle
dbReady = true;
console.info("Connected to database")
dbHandle.on('reconnectFailed', (err : any) => { console.warn(`Error on database ${err}`) })
dbHandle.on('reconnect', () => {
console.info("Reconnect on database")
dbReady = true
queue.knock()
})
dbHandle.on('timeout', () => { console.warn("Timeout on database") })
dbHandle.on('close', () => {
console.info("Close on database")
dbReady = false
})
},
(err: String) => {
console.error("Unable to connect to database: %s", err)
process.exit(1)
}
)
var queue = new Queue.Queue<MqttMessage.MqttMessage>()
queue.on('data', () => {
if (dbReady) {
while (! queue.isEmpty()) {
var msg : MqttMessage.MqttMessage = queue.deq()
console.info(`Something in the queue: ${JSON.stringify(msg)}`)
var coll = dbHandle.collection(options['collection'])
coll.insertOne(msg.getMessage())
.then(
(r) => {
console.success(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`)
},
(err) => {
console.error(`Error when trying to insert into database ${err}`)
if (! dbReady) {
console.info("Error occured while database connection is lost, re-enqueue msg.")
queue.reenq(msg)
} else {
console.error(`Message ${JSON.stringify(msg.getMessage())} is lost`)
}
}
)
}
} else {
// console.info("Database currently not available, not reading from stream")
} }
})
private dbHandle : Mongo.Db;
private dbReady : boolean = false;
private queue : Queue.Queue<MqttMessage.MqttMessage> = new Queue.Queue<MqttMessage.MqttMessage>()
private mqttClient : Mqtt.Client
private heartbeatTimer : NodeJS.Timer
private uptime : number = 0
var mqttClient = Mqtt.connect(options['broker']) connectToDatabase() {
mqttClient.on('offline', () => { console.warn("MQTT client is offline") }) console.info("About to connect to database")
mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") }) Mongo.MongoClient.connect(this.options['database'],
mqttClient.on('close', () => { console.warn("MQTT connection closed") }) {
// 'server': {
// 'reconnectTries': 5,
// 'reconnectInterval': 1000,
// 'socketOptions': {
// 'autoReconnect': false
// }
// }
})
.then(
(tmpDbHandle: Mongo.Db) => {
this.dbHandle = tmpDbHandle
this.dbReady = true;
console.info("Connected to database")
this.dbHandle.on('reconnectFailed', (err : any) => { console.warn(`Error on database ${err}`) })
this.dbHandle.on('reconnect', () => {
console.info("Reconnect on database")
this.dbReady = true
this.queue.knock()
})
this.dbHandle.on('timeout', () => { console.warn("Timeout on database") })
this.dbHandle.on('close', () => {
console.info("Close on database")
this.dbReady = false
})
},
(err: String) => {
console.error(`Unable to connect to database: ${err}`)
this.dbReady = false
}
)
}
mqttClient.on('connect', () => { connectToBroker() {
console.info("Connected to MQTT broker") this.queue.on('data', () => {
mqttClient.subscribe(options['topic']) if (this.dbReady || true) {
mqttClient.subscribe('MqttMongo/Command') while (! this.queue.isEmpty()) {
mqttClient.publish('MqttMongo/Status', 'hello, started up') var msg : MqttMessage.MqttMessage = this.queue.deq()
}) console.info(`Something in the queue: ${JSON.stringify(msg)}`)
var coll = this.dbHandle.collection(options['collection'])
coll.insertOne(msg.getMessage())
.then(
(r) => {
console.success(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`)
},
(err) => {
console.error(`Error when trying to insert into database ${err}`)
if (! this.dbReady) {
console.info("Error occured while database connection is lost, re-enqueue msg.")
this.queue.reenq(msg)
this.emit('reconnectDatabase')
} else {
console.error(`Message ${JSON.stringify(msg.getMessage())} is lost`)
}
}
)
}
} else {
// console.info("Database currently not available, not reading from stream")
}
})
this.mqttClient = Mqtt.connect(options['broker'])
this.mqttClient.on('offline', () => { console.warn("MQTT client is offline") })
this.mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") })
this.mqttClient.on('close', () => { console.warn("MQTT connection closed") })
var msgCnt : number = 0 this.mqttClient.on('connect', () => {
mqttClient.on('message', (topic : string, message : string) => { console.info("Connected to MQTT broker")
msgCnt++; this.mqttClient.subscribe(options['topic'])
console.info(`Message received ${msgCnt}, topic ${topic}, payload ${message}`) this.mqttClient.subscribe('MqttMongo/Command')
this.mqttClient.publish('MqttMongo/Status', 'hello, started up')
})
if (topic == "MqttMongo/Command" && message == "shutdown") { var msgCnt : number = 0
this.mqttClient.on('message', (topic : string, message : string) => {
msgCnt++;
console.info(`Message received ${msgCnt}, topic ${topic}, payload ${message}`)
if (topic == "MqttMongo/Command" && message == "shutdown") {
this.shutdown()
} else {
try {
var mqttMessage = new MqttMessage.MqttMessage(topic, message)
this.queue.enq(mqttMessage)
} catch (e) {
console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`)
}
}
})
}
setupHeartbeat() {
this.heartbeatTimer = setInterval(() => {
this.uptime++
this.mqttClient.publish('MqttMongo/Status', `{'Uptime': ${this.uptime}}`)
if (! this.dbReady) {
this.emit("reconnectDatabase")
}
}, 1000)
}
shutdown() {
console.info("Shutting down MqttMongo") console.info("Shutting down MqttMongo")
clearInterval(uptimeInterval) clearInterval(this.heartbeatTimer)
mqttClient.end() this.mqttClient.end()
dbHandle.close() this.dbHandle.close()
} else {
try {
var mqttMessage = new MqttMessage.MqttMessage(topic, message)
queue.enq(mqttMessage)
} catch (e) {
console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`)
}
} }
}) }
var uptime : number = 0
var uptimeInterval = setInterval(() => {
uptime++
mqttClient.publish('MqttMongo/Status', `{'Uptime': ${uptime}}`)
}, 1000)
var mqttMongo : MqttMongo = new MqttMongo(options)
mqttMongo.connectToDatabase()
mqttMongo.connectToBroker()
mqttMongo.setupHeartbeat()
console.info("MqttMongo started") console.info("MqttMongo started")