add queue, message, json parsing

This commit is contained in:
Wolfgang Hottgenroth 2017-05-04 11:35:37 +02:00
parent 2c37cd3501
commit 402ee68e94
3 changed files with 165 additions and 101 deletions

View File

@ -1,32 +1,29 @@
import * as Mqtt from 'mqtt' import * as Mqtt from 'mqtt'
import * as Mongo from 'mongodb' import * as Mongo from 'mongodb'
import * as Stream from 'stream' import * as Queue from './queue'
import * as MqttMessage from './mqtt_message'
var MQTT_BROKER_URL : String = 'mqtt://localhost' var MQTT_BROKER_URL : String = 'mqtt://localhost'
var MONGO_DATABASE_URL : String = 'mongodb://localhost/test' var MONGO_DATABASE_URL : String = 'mongodb://localhost/test'
var COLLECTION : String = 'mqttMongo'
var TOPIC : String = 'mqttMongo'
import options = require('commander')
class MqttStream extends Stream.Readable { options
_read() { .version('0.0.1')
} .option('-b, --broker [broker url]', 'Broker URL', MQTT_BROKER_URL)
} .option('-m, --database [database url]', 'MongoDB Database URL', MONGO_DATABASE_URL)
.option('-c, --collection [mongodb collection]', 'Collection in MongoDB Database', COLLECTION)
.option('-t, --topic [topic to subscribe]', 'Topic to subscribe', TOPIC)
.parse(process.argv)
class MqttMongo { var dbHandle : Mongo.Db;
private mqttClient : Mqtt.Client var dbReady : boolean = false;
private dbHandle : Mongo.Db;
private dbReady : boolean = false;
private msgCnt : number = 0;
private mqttStream : MqttStream
constructor() { Mongo.MongoClient.connect(options['database'],
this.mqttClient = Mqtt.connect(options['broker'])
this.mqttClient.on('offline', () => { console.log("mqtt client is offline") })
this.mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") })
this.mqttClient.on('close', () => { console.log("mqtt connection closed") })
Mongo.MongoClient.connect(options['database'],
{ {
'server': { 'server': {
'reconnectTries': 10000, 'reconnectTries': 10000,
@ -37,80 +34,85 @@ class MqttMongo {
} }
}) })
.then( .then(
(dbHandle: Mongo.Db) => { (tmpDbHandle: Mongo.Db) => {
this.dbHandle = dbHandle dbHandle = tmpDbHandle
this.dbReady = true; dbReady = true;
this.dbHandle.on('error', () => { console.log("error on database") }) console.log("Connected to database")
this.dbHandle.on('reconnect', () => { dbHandle.on('error', () => { console.log("error on database") })
dbHandle.on('reconnect', () => {
console.log("reconnect on database") console.log("reconnect on database")
this.dbReady = true dbReady = true
queue.knock()
}) })
this.dbHandle.on('timeout', () => { console.log("timeout on database") }) dbHandle.on('timeout', () => { console.log("timeout on database") })
this.dbHandle.on('close', () => { dbHandle.on('close', () => {
console.log("close on database") console.log("close on database")
this.dbReady = false dbReady = false
}) })
}, },
(err: String) => { (err: String) => {
console.log("Unable to connect to database: %s", err) console.log("Unable to connect to database: %s", err)
process.exit(1) process.exit(1)
} }
) )
this.mqttStream = new MqttStream() var queue = new Queue.Queue<MqttMessage.MqttMessage>()
this.mqttStream.on('data', () => { queue.on('data', () => {
if (this.dbReady) { if (dbReady) {
var msg while (! queue.isEmpty()) {
while ((msg = this.mqttStream.read()) != null) { var msg : MqttMessage.MqttMessage = queue.deq()
console.log("Something on the stream: %s", msg) console.log("Something in the queue: %s", JSON.stringify(msg))
var coll = this.dbHandle.collection('mqttMongo') var coll = dbHandle.collection(options['collection'])
coll.insertOne({'a': msg}) coll.insertOne(msg.getMessage())
.then( .then(
(r) => { (r) => {
console.log("successfully inserted into database") console.log("successfully inserted into database")
}, },
(err) => { (err) => {
console.log("error when trying to insert into database") console.log("error when trying to insert into database")
queue.reenq(msg)
} }
) )
} }
} else { } else {
console.log("database currently not available, not reading from stream") console.log("database currently not available, not reading from stream")
} }
}) })
}
exec(): void { var mqttClient = Mqtt.connect(options['broker'])
this.mqttClient.on('connect', () => { mqttClient.on('offline', () => { console.log("mqtt client is offline") })
mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") })
mqttClient.on('close', () => { console.log("mqtt connection closed") })
mqttClient.on('connect', () => {
console.log("mqtt client connected to broker") console.log("mqtt client connected to broker")
this.mqttClient.subscribe('MqttMongo/Command') mqttClient.subscribe(options['topic'])
this.mqttClient.publish('MqttMongo/Status', 'hello, started up') mqttClient.subscribe('MqttMongo/Command')
}) mqttClient.publish('MqttMongo/Status', 'hello, started up')
})
this.mqttClient.on('message', (topic : string, message : string) => { var uptime : number = 0
this.msgCnt++; var uptimeInterval = setInterval(() => {
console.log(`message received ${this.msgCnt}, topic ${topic}, payload ${message}`) uptime++
mqttClient.publish('MqttMongo/Status', `{'Uptime': ${uptime}}`)
}, 1000)
var msgCnt : number = 0
mqttClient.on('message', (topic : string, message : string) => {
msgCnt++;
console.log(`message received ${msgCnt}, topic ${topic}, payload ${message}`)
var mqttMessage = new MqttMessage.MqttMessage(topic, message)
if (topic == "MqttMongo/Command" && message == "shutdown") { if (topic == "MqttMongo/Command" && message == "shutdown") {
this.mqttClient.end() clearInterval(uptimeInterval)
mqttClient.end()
dbHandle.close()
} else { } else {
this.mqttStream.push(`${topic}:${message}`) queue.enq(mqttMessage)
this.mqttStream.emit('data')
} }
}) })
}
}
import options = require('commander')
options
.version('0.0.1')
.option('-b, --broker [broker url]', 'Broker URL', MQTT_BROKER_URL)
.option('-m, --database [database url]', 'MongoDB Database URL', MONGO_DATABASE_URL)
.parse(process.argv)
const mqttMongo = new MqttMongo()
mqttMongo.exec()
console.log("MqttMongo started") console.log("MqttMongo started")

17
src/mqtt_message.ts Normal file
View File

@ -0,0 +1,17 @@
export class MqttMessage {
private topic : string
private message : object
constructor(topic : string, inMessage : string) {
this.topic = topic
this.message = JSON.parse(inMessage)
}
getTopic() : string {
return this.topic
}
getMessage() : object {
return this.message
}
}

45
src/queue.ts Normal file
View File

@ -0,0 +1,45 @@
import * as Events from 'events'
export class Queue<T> extends Events.EventEmitter {
private q : T[] = []
constructor() {
super()
}
isEmpty() : boolean {
return this.q.length == 0
}
knock() {
this.emit('data')
}
enq(x : T) {
this.q.push(x)
this.emit('data')
}
reenq(x : T) {
this.q.unshift(x)
this.emit('data')
}
deq() : T {
var x : T = this.peek()
this.q.shift()
return x
}
peek() : T {
if (this.isEmpty()) {
throw new Error("queue is empty")
}
var x : T = this.q[0]
return x
}
}