significantly improved
This commit is contained in:
42
src/main.ts
42
src/main.ts
@ -5,13 +5,18 @@ import * as Queue from './queue'
|
||||
import * as MqttMessage from './mqtt_message'
|
||||
|
||||
|
||||
var console = require('chalk-console')
|
||||
let console = require('chalk-console')
|
||||
|
||||
|
||||
var MQTT_BROKER_URL : String = 'mqtt://localhost'
|
||||
var MONGO_DATABASE_URL : String = 'mongodb://localhost/test'
|
||||
var COLLECTION : String = 'mqttMongo'
|
||||
var TOPIC : String = 'mqttMongo'
|
||||
const MQTT_BROKER_URL : String = 'mqtt://localhost'
|
||||
const MONGO_DATABASE_URL : String = 'mongodb://localhost/test'
|
||||
const COLLECTION : String = 'mqttMongo'
|
||||
|
||||
|
||||
function collect(val: string, memo: string[]) {
|
||||
memo.push(val);
|
||||
return memo;
|
||||
}
|
||||
|
||||
import options = require('commander')
|
||||
options
|
||||
@ -19,7 +24,9 @@ options
|
||||
.option('-b, --broker [broker url]', 'Broker URL', MQTT_BROKER_URL)
|
||||
.option('-m, --database [database url]', 'MongoDB Database URL', MONGO_DATABASE_URL)
|
||||
.option('-c, --collection [mongodb collection]', 'Collection in MongoDB Database', COLLECTION)
|
||||
.option('-t, --topic [topic to subscribe]', 'Topic to subscribe', TOPIC)
|
||||
.option('-t, --topic [topic to subscribe]', 'Topic to subscribe, can appear multiple times', collect, [])
|
||||
.option('-e, --encapsulate', 'store current timestamp, topic and payload in document', false)
|
||||
.option('-p, --parsePayload', 'parse payload when encapsulating (otherwise always)', false)
|
||||
.parse(process.argv)
|
||||
|
||||
|
||||
@ -42,7 +49,7 @@ class MqttMongo extends Events.EventEmitter {
|
||||
|
||||
connectToDatabase() {
|
||||
console.info("About to connect to database")
|
||||
Mongo.MongoClient.connect(this.options['database'])
|
||||
Mongo.MongoClient.connect(this.options.database)
|
||||
.then(
|
||||
(tmpDbHandle: Mongo.Db) => {
|
||||
this.dbHandle = tmpDbHandle
|
||||
@ -71,9 +78,9 @@ class MqttMongo extends Events.EventEmitter {
|
||||
this.queue.on('data', () => {
|
||||
if (this.dbReady || true) {
|
||||
while (! this.queue.isEmpty()) {
|
||||
var msg : MqttMessage.MqttMessage = this.queue.deq()
|
||||
let msg : MqttMessage.MqttMessage = this.queue.deq()
|
||||
console.info(`Something in the queue: ${JSON.stringify(msg)}`)
|
||||
var coll = this.dbHandle.collection(options['collection'])
|
||||
let coll = this.dbHandle.collection(this.options.collection)
|
||||
coll.insertOne(msg.getMessage())
|
||||
.then(
|
||||
(r) => {
|
||||
@ -96,29 +103,32 @@ class MqttMongo extends Events.EventEmitter {
|
||||
}
|
||||
})
|
||||
|
||||
this.mqttClient = Mqtt.connect(options['broker'])
|
||||
this.mqttClient = Mqtt.connect(this.options.broker)
|
||||
this.mqttClient.on('offline', () => { console.warn("MQTT client is offline") })
|
||||
this.mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") })
|
||||
this.mqttClient.on('close', () => { console.warn("MQTT connection closed") })
|
||||
|
||||
this.mqttClient.on('connect', () => {
|
||||
console.info("MQTT broker connected")
|
||||
this.mqttClient.subscribe(options['topic'])
|
||||
this.options.topic.forEach((topic: string) => {
|
||||
this.mqttClient.subscribe(topic)
|
||||
console.info(`Subscribed to ${topic}`)
|
||||
})
|
||||
this.mqttClient.subscribe('MqttMongo/Command')
|
||||
this.mqttClient.publish('MqttMongo/Status', 'hello, started up')
|
||||
})
|
||||
|
||||
var msgCnt : number = 0
|
||||
this.mqttClient.on('message', (topic : string, message : string) => {
|
||||
let msgCnt : number = 0
|
||||
this.mqttClient.on('message', (topic : string, messageBuf : Buffer) => {
|
||||
msgCnt++;
|
||||
let message = messageBuf.toString('UTF-8')
|
||||
console.info(`Message received ${msgCnt}, topic ${topic}, payload ${message}`)
|
||||
|
||||
if (topic == "MqttMongo/Command" && message == "shutdown") {
|
||||
this.shutdown()
|
||||
} else {
|
||||
try {
|
||||
var mqttMessage = new MqttMessage.MqttMessage(topic, message)
|
||||
this.queue.enq(mqttMessage)
|
||||
this.queue.enq(new MqttMessage.MqttMessage(topic, message, this.options.encapsulate, this.options.parsePayload))
|
||||
} catch (e) {
|
||||
console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`)
|
||||
}
|
||||
@ -145,7 +155,7 @@ class MqttMongo extends Events.EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
var mqttMongo : MqttMongo = new MqttMongo(options)
|
||||
let mqttMongo : MqttMongo = new MqttMongo(options)
|
||||
mqttMongo.connectToDatabase()
|
||||
mqttMongo.connectToBroker()
|
||||
mqttMongo.setupHeartbeat()
|
||||
|
@ -8,10 +8,20 @@ export class MqttMessage {
|
||||
private topic : string
|
||||
private message : object
|
||||
|
||||
constructor(topic : string, inMessage : string) {
|
||||
constructor(topic : string, inMessage : string, encapsulate: boolean = false, parsePayload: boolean = false) {
|
||||
this.topic = topic
|
||||
try {
|
||||
this.message = JSON.parse(inMessage)
|
||||
if (encapsulate) {
|
||||
let payload: string
|
||||
if (parsePayload) {
|
||||
payload = JSON.parse(inMessage)
|
||||
} else {
|
||||
payload = inMessage
|
||||
}
|
||||
this.message = {'topic': topic, 'payload': payload, 'ts': new Date()}
|
||||
} else {
|
||||
this.message = JSON.parse(inMessage)
|
||||
}
|
||||
} catch (e) {
|
||||
throw new MqttMessageError(`error while parsing message, ${e.toString()}`)
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ export class Queue<T> extends Events.EventEmitter {
|
||||
}
|
||||
|
||||
deq() : T {
|
||||
var x : T = this.peek()
|
||||
let x : T = this.peek()
|
||||
this.q.shift()
|
||||
|
||||
return x
|
||||
@ -38,7 +38,7 @@ export class Queue<T> extends Events.EventEmitter {
|
||||
throw new Error("queue is empty")
|
||||
}
|
||||
|
||||
var x : T = this.q[0]
|
||||
let x : T = this.q[0]
|
||||
|
||||
return x
|
||||
}
|
||||
|
Reference in New Issue
Block a user