This commit is contained in:
Wolfgang Hottgenroth
2017-05-04 21:44:39 +02:00
parent aca88bcbb8
commit 0c9468f3a5
5 changed files with 113 additions and 31 deletions

View File

@ -22,6 +22,7 @@
"typescript": "^2.3.1" "typescript": "^2.3.1"
}, },
"dependencies": { "dependencies": {
"chalk-console": "^1.0.1",
"commander": "^2.9.0", "commander": "^2.9.0",
"mongodb": "^2.2.26", "mongodb": "^2.2.26",
"mqtt": "^2.6.2", "mqtt": "^2.6.2",

View File

@ -3,6 +3,8 @@ import * as Mongo from 'mongodb'
import * as Queue from './queue' import * as Queue from './queue'
import * as MqttMessage from './mqtt_message' import * as MqttMessage from './mqtt_message'
var console = require('chalk-console')
var MQTT_BROKER_URL : String = 'mqtt://localhost' var MQTT_BROKER_URL : String = 'mqtt://localhost'
var MONGO_DATABASE_URL : String = 'mongodb://localhost/test' var MONGO_DATABASE_URL : String = 'mongodb://localhost/test'
@ -24,34 +26,34 @@ var dbHandle : Mongo.Db;
var dbReady : boolean = false; var dbReady : boolean = false;
Mongo.MongoClient.connect(options['database'], Mongo.MongoClient.connect(options['database'],
{ {
'server': { 'server': {
'reconnectTries': 10000, 'reconnectTries': 5,
'reconnectInterval': 1000, 'reconnectInterval': 1000,
'socketOptions': { 'socketOptions': {
'autoReconnect': false 'autoReconnect': false
} }
} }
}) })
.then( .then(
(tmpDbHandle: Mongo.Db) => { (tmpDbHandle: Mongo.Db) => {
dbHandle = tmpDbHandle dbHandle = tmpDbHandle
dbReady = true; dbReady = true;
console.log("Connected to database") console.info("Connected to database")
dbHandle.on('error', () => { console.log("error on database") }) dbHandle.on('error', () => { console.warn("Error on database") })
dbHandle.on('reconnect', () => { dbHandle.on('reconnect', () => {
console.log("reconnect on database") console.info("Reconnect on database")
dbReady = true dbReady = true
queue.knock() queue.knock()
}) })
dbHandle.on('timeout', () => { console.log("timeout on database") }) dbHandle.on('timeout', () => { console.warn("Timeout on database") })
dbHandle.on('close', () => { dbHandle.on('close', () => {
console.log("close on database") console.info("Close on database")
dbReady = false dbReady = false
}) })
}, },
(err: String) => { (err: String) => {
console.log("Unable to connect to database: %s", err) console.error("Unable to connect to database: %s", err)
process.exit(1) process.exit(1)
} }
) )
@ -59,35 +61,40 @@ Mongo.MongoClient.connect(options['database'],
var queue = new Queue.Queue<MqttMessage.MqttMessage>() var queue = new Queue.Queue<MqttMessage.MqttMessage>()
queue.on('data', () => { queue.on('data', () => {
if (dbReady) { if (dbReady || true) { // FIXME
while (! queue.isEmpty()) { while (! queue.isEmpty()) {
var msg : MqttMessage.MqttMessage = queue.deq() var msg : MqttMessage.MqttMessage = queue.deq()
console.log("Something in the queue: %s", JSON.stringify(msg)) console.info(`Something in the queue: ${JSON.stringify(msg)}`)
var coll = dbHandle.collection(options['collection']) var coll = dbHandle.collection(options['collection'])
coll.insertOne(msg.getMessage()) coll.insertOne(msg.getMessage())
.then( .then(
(r) => { (r) => {
console.log("successfully inserted into database") console.success(`Successfully inserted into database ${JSON.stringify(msg.getMessage())}`)
}, },
(err) => { (err) => {
console.log("error when trying to insert into database") console.error(`Error when trying to insert into database ${err}`)
queue.reenq(msg) if (! dbReady) {
console.info("Error occured while database connection is lost, re-enqueue msg.")
queue.reenq(msg)
} else {
console.error(`Message ${JSON.stringify(msg.getMessage())} is lost`)
}
} }
) )
} }
} else { } else {
console.log("database currently not available, not reading from stream") // console.info("Database currently not available, not reading from stream")
} }
}) })
var mqttClient = Mqtt.connect(options['broker']) var mqttClient = Mqtt.connect(options['broker'])
mqttClient.on('offline', () => { console.log("mqtt client is offline") }) mqttClient.on('offline', () => { console.warn("MQTT client is offline") })
mqttClient.on('reconnect', () => { console.log("mqtt client is reconnecting") }) mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") })
mqttClient.on('close', () => { console.log("mqtt connection closed") }) mqttClient.on('close', () => { console.warn("MQTT connection closed") })
mqttClient.on('connect', () => { mqttClient.on('connect', () => {
console.log("mqtt client connected to broker") console.info("Connected to MQTT broker")
mqttClient.subscribe(options['topic']) mqttClient.subscribe(options['topic'])
mqttClient.subscribe('MqttMongo/Command') mqttClient.subscribe('MqttMongo/Command')
mqttClient.publish('MqttMongo/Status', 'hello, started up') mqttClient.publish('MqttMongo/Status', 'hello, started up')
@ -96,15 +103,20 @@ mqttClient.on('connect', () => {
var msgCnt : number = 0 var msgCnt : number = 0
mqttClient.on('message', (topic : string, message : string) => { mqttClient.on('message', (topic : string, message : string) => {
msgCnt++; msgCnt++;
console.log(`message received ${msgCnt}, topic ${topic}, payload ${message}`) console.info(`Message received ${msgCnt}, topic ${topic}, payload ${message}`)
if (topic == "MqttMongo/Command" && message == "shutdown") { if (topic == "MqttMongo/Command" && message == "shutdown") {
console.info("Shutting down MqttMongo")
clearInterval(uptimeInterval) clearInterval(uptimeInterval)
mqttClient.end() mqttClient.end()
dbHandle.close() dbHandle.close()
} else { } else {
var mqttMessage = new MqttMessage.MqttMessage(topic, message) try {
queue.enq(mqttMessage) var mqttMessage = new MqttMessage.MqttMessage(topic, message)
queue.enq(mqttMessage)
} catch (e) {
console.error(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`)
}
} }
}) })
@ -117,4 +129,4 @@ var uptimeInterval = setInterval(() => {
console.log("MqttMongo started") console.info("MqttMongo started")

View File

@ -1,10 +1,20 @@
export class MqttMessageError extends Error {
constructor(msg : string) {
super(msg)
}
}
export class MqttMessage { export class MqttMessage {
private topic : string private topic : string
private message : object private message : object
constructor(topic : string, inMessage : string) { constructor(topic : string, inMessage : string) {
this.topic = topic this.topic = topic
this.message = JSON.parse(inMessage) try {
this.message = JSON.parse(inMessage)
} catch (e) {
throw new MqttMessageError(`error while parsing message, ${e.toString()}`)
}
} }
getTopic() : string { getTopic() : string {

View File

@ -23,7 +23,7 @@ export class Queue<T> extends Events.EventEmitter {
reenq(x : T) { reenq(x : T) {
this.q.unshift(x) this.q.unshift(x)
this.emit('data') // this.emit('data')
} }
deq() : T { deq() : T {

59
src/unide.ts Normal file
View File

@ -0,0 +1,59 @@
/*
* content-spec
* device
* deviceID
* operationalStatus
* metaData
* part
* partTypeID
* partID
* result
* code
* metaData
* measurements
* ts
* result
* code
* series
* $_time
* <variable>
* limits
* <variable>
* upperError
* lowerError
* upperWarn
* lowerWarn
*/
class Device {
private deviceID : string
private operationalStatus: string
private metaData: object
}
enum Result {
NOK,
OK,
UNKNOWN
}
class Part {
private partTypeID : string
private partID : string
private result : Result
private code : string
private metaData : object
}
class Measurement {
private ts : string
private result : Result
private code : string
}
class MeasurementPayload {
private contentSpec : string = 'urn:spec://eclipse.org/unide/measurement-message#v2'
private device : Device
private part : Part
private measurements : Measurement[]
}