From 0c9468f3a5e721cbd3ef8a58d5188f3bbc396608 Mon Sep 17 00:00:00 2001 From: Wolfgang Hottgenroth Date: Thu, 4 May 2017 21:44:39 +0200 Subject: [PATCH] changes --- package.json | 1 + src/main.ts | 70 ++++++++++++++++++++++++++------------------- src/mqtt_message.ts | 12 +++++++- src/queue.ts | 2 +- src/unide.ts | 59 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 31 deletions(-) create mode 100644 src/unide.ts diff --git a/package.json b/package.json index cb30a34..bc3bf41 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "typescript": "^2.3.1" }, "dependencies": { + "chalk-console": "^1.0.1", "commander": "^2.9.0", "mongodb": "^2.2.26", "mqtt": "^2.6.2", diff --git a/src/main.ts b/src/main.ts index c8bbe85..516027a 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,6 +3,8 @@ import * as Mongo from 'mongodb' import * as Queue from './queue' import * as MqttMessage from './mqtt_message' +var console = require('chalk-console') + var MQTT_BROKER_URL : String = 'mqtt://localhost' var MONGO_DATABASE_URL : String = 'mongodb://localhost/test' @@ -24,34 +26,34 @@ var dbHandle : Mongo.Db; var dbReady : boolean = false; Mongo.MongoClient.connect(options['database'], - { - 'server': { - 'reconnectTries': 10000, - 'reconnectInterval': 1000, - 'socketOptions': { - 'autoReconnect': false - } - } - }) +{ + 'server': { + 'reconnectTries': 5, + 'reconnectInterval': 1000, + 'socketOptions': { + 'autoReconnect': false + } + } +}) .then( (tmpDbHandle: Mongo.Db) => { dbHandle = tmpDbHandle dbReady = true; - console.log("Connected to database") - dbHandle.on('error', () => { console.log("error on database") }) + console.info("Connected to database") + dbHandle.on('error', () => { console.warn("Error on database") }) dbHandle.on('reconnect', () => { - console.log("reconnect on database") + console.info("Reconnect on database") dbReady = true queue.knock() }) - dbHandle.on('timeout', () => { console.log("timeout on database") }) + dbHandle.on('timeout', () => { console.warn("Timeout on database") }) dbHandle.on('close', () => { - console.log("close on database") + console.info("Close on database") dbReady = false }) }, (err: String) => { - console.log("Unable to connect to database: %s", err) + console.error("Unable to connect to database: %s", err) process.exit(1) } ) @@ -59,35 +61,40 @@ Mongo.MongoClient.connect(options['database'], var queue = new Queue.Queue() queue.on('data', () => { - if (dbReady) { + if (dbReady || true) { // FIXME while (! queue.isEmpty()) { var msg : MqttMessage.MqttMessage = queue.deq() - console.log("Something in the queue: %s", JSON.stringify(msg)) + console.info(`Something in the queue: ${JSON.stringify(msg)}`) var coll = dbHandle.collection(options['collection']) coll.insertOne(msg.getMessage()) .then( (r) => { - console.log("successfully inserted into database") + console.success(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`) }, (err) => { - console.log("error when trying to insert into database") - queue.reenq(msg) + console.error(`Error when trying to insert into database ${err}`) + if (! dbReady) { + console.info("Error occured while database connection is lost, re-enqueue msg.") + queue.reenq(msg) + } else { + console.error(`Message ${JSON.stringify(msg.getMessage())} is lost`) + } } ) } } else { - console.log("database currently not available, not reading from stream") + // console.info("Database currently not available, not reading from stream") } }) var mqttClient = Mqtt.connect(options['broker']) -mqttClient.on('offline', () => { console.log("mqtt client is offline") }) -mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") }) -mqttClient.on('close', () => { console.log("mqtt connection closed") }) +mqttClient.on('offline', () => { console.warn("MQTT client is offline") }) +mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") }) +mqttClient.on('close', () => { console.warn("MQTT connection closed") }) mqttClient.on('connect', () => { - console.log("mqtt client connected to broker") + console.info("Connected to MQTT broker") mqttClient.subscribe(options['topic']) mqttClient.subscribe('MqttMongo/Command') mqttClient.publish('MqttMongo/Status', 'hello, started up') @@ -96,15 +103,20 @@ mqttClient.on('connect', () => { var msgCnt : number = 0 mqttClient.on('message', (topic : string, message : string) => { msgCnt++; - console.log(`message received ${msgCnt}, topic ${topic}, payload ${message}`) + console.info(`Message received ${msgCnt}, topic ${topic}, payload ${message}`) if (topic == "MqttMongo/Command" && message == "shutdown") { + console.info("Shutting down MqttMongo") clearInterval(uptimeInterval) mqttClient.end() dbHandle.close() } else { - var mqttMessage = new MqttMessage.MqttMessage(topic, message) - queue.enq(mqttMessage) + try { + var mqttMessage = new MqttMessage.MqttMessage(topic, message) + queue.enq(mqttMessage) + } catch (e) { + console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`) + } } }) @@ -117,4 +129,4 @@ var uptimeInterval = setInterval(() => { -console.log("MqttMongo started") \ No newline at end of file +console.info("MqttMongo started") \ No newline at end of file diff --git a/src/mqtt_message.ts b/src/mqtt_message.ts index 00b01e5..f32fa2d 100644 --- a/src/mqtt_message.ts +++ b/src/mqtt_message.ts @@ -1,10 +1,20 @@ +export class MqttMessageError extends Error { + constructor(msg : string) { + super(msg) + } +} + export class MqttMessage { private topic : string private message : object constructor(topic : string, inMessage : string) { this.topic = topic - this.message = JSON.parse(inMessage) + try { + this.message = JSON.parse(inMessage) + } catch (e) { + throw new MqttMessageError(`error while parsing message, ${e.toString()}`) + } } getTopic() : string { diff --git a/src/queue.ts b/src/queue.ts index 647f35e..1301133 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -23,7 +23,7 @@ export class Queue extends Events.EventEmitter { reenq(x : T) { this.q.unshift(x) - this.emit('data') + // this.emit('data') } deq() : T { diff --git a/src/unide.ts b/src/unide.ts new file mode 100644 index 0000000..5ec0c4b --- /dev/null +++ b/src/unide.ts @@ -0,0 +1,59 @@ +/* +* content-spec +* device + * deviceID + * operationalStatus + * metaData +* part + * partTypeID + * partID + * result + * code + * metaData +* measurements + * ts + * result + * code + * series + * $_time + * + * limits + * + * upperError + * lowerError + * upperWarn + * lowerWarn +*/ + +class Device { + private deviceID : string + private operationalStatus: string + private metaData: object +} + +enum Result { + NOK, + OK, + UNKNOWN +} + +class Part { + private partTypeID : string + private partID : string + private result : Result + private code : string + private metaData : object +} + +class Measurement { + private ts : string + private result : Result + private code : string + +} +class MeasurementPayload { + private contentSpec : string = 'urn:spec://eclipse.org/unide/measurement-message#v2' + private device : Device + private part : Part + private measurements : Measurement[] +} \ No newline at end of file