changes, not yet working

This commit is contained in:
Wolfgang Hottgenroth 2017-05-03 22:11:41 +02:00
parent f079461997
commit 2c37cd3501
2 changed files with 62 additions and 8 deletions

View File

@ -24,6 +24,8 @@
"dependencies": { "dependencies": {
"commander": "^2.9.0", "commander": "^2.9.0",
"mongodb": "^2.2.26", "mongodb": "^2.2.26",
"mqtt": "^2.6.2" "mqtt": "^2.6.2",
"queuejs": "^0.1.0",
"readable-stream": "^2.2.9"
} }
} }

View File

@ -1,15 +1,24 @@
import * as Mqtt from 'mqtt' import * as Mqtt from 'mqtt'
import * as Mongo from 'mongodb' import * as Mongo from 'mongodb'
import * as Stream from 'stream'
var MQTT_BROKER_URL : String = 'mqtt://localhost' var MQTT_BROKER_URL : String = 'mqtt://localhost'
var MONGO_DATABASE_URL : String = 'mongodb://localhost/test' var MONGO_DATABASE_URL : String = 'mongodb://localhost/test'
class MqttStream extends Stream.Readable {
_read() {
}
}
class MqttMongo { class MqttMongo {
private mqttClient : Mqtt.Client private mqttClient : Mqtt.Client
private dbHandle : Mongo.Db; private dbHandle : Mongo.Db;
private dbReady : boolean = false;
private msgCnt : number = 0; private msgCnt : number = 0;
private mqttStream : MqttStream
constructor() { constructor() {
this.mqttClient = Mqtt.connect(options['broker']) this.mqttClient = Mqtt.connect(options['broker'])
@ -17,16 +26,59 @@ class MqttMongo {
this.mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") }) this.mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") })
this.mqttClient.on('close', () => { console.log("mqtt connection closed") }) this.mqttClient.on('close', () => { console.log("mqtt connection closed") })
/* Mongo.MongoClient.connect(options['database'],
Mongo.MongoClient.connect(options['database']) {
'server': {
'reconnectTries': 10000,
'reconnectInterval': 1000,
'socketOptions': {
'autoReconnect': false
}
}
})
.then( .then(
(dbHandle: Mongo.Db) => { this.dbHandle = dbHandle }, (dbHandle: Mongo.Db) => {
this.dbHandle = dbHandle
this.dbReady = true;
this.dbHandle.on('error', () => { console.log("error on database") })
this.dbHandle.on('reconnect', () => {
console.log("reconnect on database")
this.dbReady = true
})
this.dbHandle.on('timeout', () => { console.log("timeout on database") })
this.dbHandle.on('close', () => {
console.log("close on database")
this.dbReady = false
})
},
(err: String) => { (err: String) => {
console.log("Unable to connect to database: %s", err) console.log("Unable to connect to database: %s", err)
process.exit(1) process.exit(1)
} }
) )
*/
this.mqttStream = new MqttStream()
this.mqttStream.on('data', () => {
if (this.dbReady) {
var msg
while ((msg = this.mqttStream.read()) != null) {
console.log("Something on the stream: %s", msg)
var coll = this.dbHandle.collection('mqttMongo')
coll.insertOne({'a': msg})
.then(
(r) => {
console.log("successfully inserted into database")
},
(err) => {
console.log("error when trying to insert into database")
}
)
}
} else {
console.log("database currently not available, not reading from stream")
}
})
} }
exec(): void { exec(): void {
@ -43,8 +95,8 @@ class MqttMongo {
if (topic == "MqttMongo/Command" && message == "shutdown") { if (topic == "MqttMongo/Command" && message == "shutdown") {
this.mqttClient.end() this.mqttClient.end()
} else { } else {
// verify JSON format of message this.mqttStream.push(`${topic}:${message}`)
// insert into database this.mqttStream.emit('data')
} }
}) })
} }