From 402ee68e9432ac1c5ddb4e200c941ceaf602702b Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Thu, 4 May 2017 11:35:37 +0200 Subject: [PATCH] add queue, message, json parsing --- src/main.ts | 204 ++++++++++++++++++++++---------------------- src/mqtt_message.ts | 17 ++++ src/queue.ts | 45 ++++++++++ 3 files changed, 165 insertions(+), 101 deletions(-) create mode 100644 src/mqtt_message.ts create mode 100644 src/queue.ts diff --git a/src/main.ts b/src/main.ts index 012a414..4e2c090 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,116 +1,118 @@ import * as Mqtt from 'mqtt' import * as Mongo from 'mongodb' -import * as Stream from 'stream' +import * as Queue from './queue' +import * as MqttMessage from './mqtt_message' + var MQTT_BROKER_URL : String = 'mqtt://localhost' var MONGO_DATABASE_URL : String = 'mongodb://localhost/test' - - -class MqttStream extends Stream.Readable { - _read() { - } -} - - - -class MqttMongo { - private mqttClient : Mqtt.Client - private dbHandle : Mongo.Db; - private dbReady : boolean = false; - private msgCnt : number = 0; - private mqttStream : MqttStream - - constructor() { - this.mqttClient = Mqtt.connect(options['broker']) - this.mqttClient.on('offline', () => { console.log("mqtt client is offline") }) - this.mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") }) - this.mqttClient.on('close', () => { console.log("mqtt connection closed") }) - - Mongo.MongoClient.connect(options['database'], - { - 'server': { - 'reconnectTries': 10000, - 'reconnectInterval': 1000, - 'socketOptions': { - 'autoReconnect': false - } - } - }) - .then( - (dbHandle: Mongo.Db) => { - this.dbHandle = dbHandle - this.dbReady = true; - this.dbHandle.on('error', () => { console.log("error on database") }) - this.dbHandle.on('reconnect', () => { - console.log("reconnect on database") - this.dbReady = true - }) - this.dbHandle.on('timeout', () => { console.log("timeout on database") }) - this.dbHandle.on('close', () => { - console.log("close on database") - this.dbReady = false - }) - }, - (err: String) => { - console.log("Unable to connect to database: %s", err) - process.exit(1) - } - ) - - this.mqttStream = new MqttStream() - this.mqttStream.on('data', () => { - if (this.dbReady) { - var msg - while ((msg = this.mqttStream.read()) != null) { - console.log("Something on the stream: %s", msg) - var coll = this.dbHandle.collection('mqttMongo') - coll.insertOne({'a': msg}) - .then( - (r) => { - console.log("successfully inserted into database") - }, - (err) => { - console.log("error when trying to insert into database") - } - ) - } - } else { - console.log("database currently not available, not reading from stream") - } - }) - - } - - exec(): void { - this.mqttClient.on('connect', () => { - console.log("mqtt client connected to broker") - this.mqttClient.subscribe('MqttMongo/Command') - this.mqttClient.publish('MqttMongo/Status', 'hello, started up') - }) - - this.mqttClient.on('message', (topic : string, message : string) => { - this.msgCnt++; - console.log(`message received ${this.msgCnt}, topic ${topic}, payload ${message}`) - - if (topic == "MqttMongo/Command" && message == "shutdown") { - this.mqttClient.end() - } else { - this.mqttStream.push(`${topic}:${message}`) - this.mqttStream.emit('data') - } - }) - } -} - +var COLLECTION : String = 'mqttMongo' +var TOPIC : String = 'mqttMongo' import options = require('commander') options .version('0.0.1') .option('-b, --broker [broker url]', 'Broker URL', MQTT_BROKER_URL) .option('-m, --database [database url]', 'MongoDB Database URL', MONGO_DATABASE_URL) + .option('-c, --collection [mongodb collection]', 'Collection in MongoDB Database', COLLECTION) + .option('-t, --topic [topic to subscribe]', 'Topic to subscribe', TOPIC) .parse(process.argv) -const mqttMongo = new MqttMongo() -mqttMongo.exec() + + +var dbHandle : Mongo.Db; +var dbReady : boolean = false; + +Mongo.MongoClient.connect(options['database'], + { + 'server': { + 'reconnectTries': 10000, + 'reconnectInterval': 1000, + 'socketOptions': { + 'autoReconnect': false + } + } + }) + .then( + (tmpDbHandle: Mongo.Db) => { + dbHandle = tmpDbHandle + dbReady = true; + console.log("Connected to database") + dbHandle.on('error', () => { console.log("error on database") }) + dbHandle.on('reconnect', () => { + console.log("reconnect on database") + dbReady = true + queue.knock() + }) + dbHandle.on('timeout', () => { console.log("timeout on database") }) + dbHandle.on('close', () => { + console.log("close on database") + dbReady = false + }) + }, + (err: String) => { + console.log("Unable to connect to database: %s", err) + process.exit(1) + } +) + +var queue = new Queue.Queue() +queue.on('data', () => { + if (dbReady) { + while (! queue.isEmpty()) { + var msg : MqttMessage.MqttMessage = queue.deq() + console.log("Something in the queue: %s", JSON.stringify(msg)) + var coll = dbHandle.collection(options['collection']) + coll.insertOne(msg.getMessage()) + .then( + (r) => { + console.log("successfully inserted into database") + }, + (err) => { + console.log("error when trying to insert into database") + queue.reenq(msg) + } + ) + } + } else { + console.log("database currently not available, not reading from stream") + } +}) + + +var mqttClient = Mqtt.connect(options['broker']) +mqttClient.on('offline', () => { console.log("mqtt client is offline") }) +mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") }) +mqttClient.on('close', () => { console.log("mqtt connection closed") }) + +mqttClient.on('connect', () => { + console.log("mqtt client connected to broker") + mqttClient.subscribe(options['topic']) + mqttClient.subscribe('MqttMongo/Command') + mqttClient.publish('MqttMongo/Status', 'hello, started up') +}) + +var uptime : number = 0 +var uptimeInterval = setInterval(() => { + uptime++ + mqttClient.publish('MqttMongo/Status', `{'Uptime': ${uptime}}`) +}, 1000) + +var msgCnt : number = 0 +mqttClient.on('message', (topic : string, message : string) => { + msgCnt++; + console.log(`message received ${msgCnt}, topic ${topic}, payload ${message}`) + var mqttMessage = new MqttMessage.MqttMessage(topic, message) + + if (topic == "MqttMongo/Command" && message == "shutdown") { + clearInterval(uptimeInterval) + mqttClient.end() + dbHandle.close() + } else { + queue.enq(mqttMessage) + } +}) + + console.log("MqttMongo started") \ No newline at end of file diff --git a/src/mqtt_message.ts b/src/mqtt_message.ts new file mode 100644 index 0000000..00b01e5 --- /dev/null +++ b/src/mqtt_message.ts @@ -0,0 +1,17 @@ +export class MqttMessage { + private topic : string + private message : object + + constructor(topic : string, inMessage : string) { + this.topic = topic + this.message = JSON.parse(inMessage) + } + + getTopic() : string { + return this.topic + } + + getMessage() : object { + return this.message + } +} \ No newline at end of file diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 0000000..647f35e --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,45 @@ +import * as Events from 'events' + + +export class Queue extends Events.EventEmitter { + private q : T[] = [] + + constructor() { + super() + } + + isEmpty() : boolean { + return this.q.length == 0 + } + + knock() { + this.emit('data') + } + + enq(x : T) { + this.q.push(x) + this.emit('data') + } + + reenq(x : T) { + this.q.unshift(x) + this.emit('data') + } + + deq() : T { + var x : T = this.peek() + this.q.shift() + + return x + } + + peek() : T { + if (this.isEmpty()) { + throw new Error("queue is empty") + } + + var x : T = this.q[0] + + return x + } +}