diff --git a/package.json b/package.json index b514414..cb30a34 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,8 @@ "dependencies": { "commander": "^2.9.0", "mongodb": "^2.2.26", - "mqtt": "^2.6.2" + "mqtt": "^2.6.2", + "queuejs": "^0.1.0", + "readable-stream": "^2.2.9" } } diff --git a/src/main.ts b/src/main.ts index aea9ab3..012a414 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,15 +1,24 @@ import * as Mqtt from 'mqtt' import * as Mongo from 'mongodb' - +import * as Stream from 'stream' var MQTT_BROKER_URL : String = 'mqtt://localhost' var MONGO_DATABASE_URL : String = 'mongodb://localhost/test' +class MqttStream extends Stream.Readable { + _read() { + } +} + + + class MqttMongo { private mqttClient : Mqtt.Client private dbHandle : Mongo.Db; + private dbReady : boolean = false; private msgCnt : number = 0; + private mqttStream : MqttStream constructor() { this.mqttClient = Mqtt.connect(options['broker']) @@ -17,16 +26,59 @@ class MqttMongo { this.mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") }) this.mqttClient.on('close', () => { console.log("mqtt connection closed") }) -/* - Mongo.MongoClient.connect(options['database']) + Mongo.MongoClient.connect(options['database'], + { + 'server': { + 'reconnectTries': 10000, + 'reconnectInterval': 1000, + 'socketOptions': { + 'autoReconnect': false + } + } + }) .then( - (dbHandle: Mongo.Db) => { this.dbHandle = dbHandle }, + (dbHandle: Mongo.Db) => { + this.dbHandle = dbHandle + this.dbReady = true; + this.dbHandle.on('error', () => { console.log("error on database") }) + this.dbHandle.on('reconnect', () => { + console.log("reconnect on database") + this.dbReady = true + }) + this.dbHandle.on('timeout', () => { console.log("timeout on database") }) + this.dbHandle.on('close', () => { + console.log("close on database") + this.dbReady = false + }) + }, (err: String) => { console.log("Unable to connect to database: %s", err) process.exit(1) } ) -*/ + + this.mqttStream = new MqttStream() + this.mqttStream.on('data', () => { + if (this.dbReady) { + var msg + while ((msg = this.mqttStream.read()) != null) { + console.log("Something on the stream: %s", msg) + var coll = this.dbHandle.collection('mqttMongo') + coll.insertOne({'a': msg}) + .then( + (r) => { + console.log("successfully inserted into database") + }, + (err) => { + console.log("error when trying to insert into database") + } + ) + } + } else { + console.log("database currently not available, not reading from stream") + } + }) + } exec(): void { @@ -43,8 +95,8 @@ class MqttMongo { if (topic == "MqttMongo/Command" && message == "shutdown") { this.mqttClient.end() } else { - // verify JSON format of message - // insert into database + this.mqttStream.push(`${topic}:${message}`) + this.mqttStream.emit('data') } }) }