start mysql
This commit is contained in:
parent
ef430847dd
commit
b621e9d080
71
src/MongoBackend.ts
Normal file
71
src/MongoBackend.ts
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
import * as Mongo from 'mongodb'
|
||||||
|
import { MqttMessage } from './mqtt_message'
|
||||||
|
import { Instance } from './instance'
|
||||||
|
|
||||||
|
|
||||||
|
export default class MongoBackend extends Instance {
|
||||||
|
constructor(options : any) {
|
||||||
|
super(options)
|
||||||
|
this.on('reconnectDatabase', this.connectToDatabase)
|
||||||
|
}
|
||||||
|
|
||||||
|
private dbHandle : Mongo.Db;
|
||||||
|
private dbReady : boolean = false;
|
||||||
|
|
||||||
|
connectToDatabase() {
|
||||||
|
this.loginfo("About to connect to database")
|
||||||
|
Mongo.MongoClient.connect(this.options.mongodbUrl)
|
||||||
|
.then(
|
||||||
|
(tmpDbHandle: Mongo.Db) => {
|
||||||
|
this.dbHandle = tmpDbHandle
|
||||||
|
this.dbReady = true;
|
||||||
|
this.loginfo(`Database ${this.options.mongodbUrl} connected`)
|
||||||
|
this.dbHandle.on('reconnectFailed', (err : any) => { this.logwarn(`Error on database ${err}`) })
|
||||||
|
this.dbHandle.on('reconnect', () => {
|
||||||
|
this.loginfo("Reconnect on database")
|
||||||
|
this.dbReady = true
|
||||||
|
})
|
||||||
|
this.dbHandle.on('timeout', () => { this.logwarn("Timeout on database") })
|
||||||
|
this.dbHandle.on('close', () => {
|
||||||
|
this.loginfo("Close on database")
|
||||||
|
this.dbReady = false
|
||||||
|
})
|
||||||
|
},
|
||||||
|
(err: String) => {
|
||||||
|
this.logerror(`Unable to connect to database: ${err}`)
|
||||||
|
this.dbReady = false
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
connectToQueue() {
|
||||||
|
this.on('data', (msg : MqttMessage) => {
|
||||||
|
if (this.dbReady) {
|
||||||
|
let coll = this.dbHandle.collection(this.options.collection)
|
||||||
|
coll.insertOne(msg.getMessage())
|
||||||
|
.then(
|
||||||
|
(r) => {
|
||||||
|
if (this.options.verbose) {
|
||||||
|
this.loginfo(`Successfully inserted into database ${this.options.mongodbUrl}, ${this.options.collection}: ${JSON.stringify(msg.getMessage())}`)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(err) => {
|
||||||
|
this.dbReady = false
|
||||||
|
this.logerror(`Error when trying to insert into database ${err}`)
|
||||||
|
this.emit('reconnectDatabase')
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// this.loginfo("Database currently not available, not reading from stream")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
}
|
147
src/MqttMongo.ts
147
src/MqttMongo.ts
@ -1,147 +0,0 @@
|
|||||||
import * as Mqtt from 'mqtt'
|
|
||||||
import * as Mongo from 'mongodb'
|
|
||||||
import * as Events from 'events'
|
|
||||||
import * as Queue from './queue'
|
|
||||||
import * as MqttMessage from './mqtt_message'
|
|
||||||
import * as logger from './log'
|
|
||||||
|
|
||||||
|
|
||||||
export default class MqttMongo extends Events.EventEmitter {
|
|
||||||
|
|
||||||
private options : any
|
|
||||||
private msgCnt : number = 0
|
|
||||||
private startTime: Date
|
|
||||||
|
|
||||||
constructor(options : any) {
|
|
||||||
super()
|
|
||||||
this.options = options
|
|
||||||
this.startTime = new Date()
|
|
||||||
this.on('reconnectDatabase', this.connectToDatabase)
|
|
||||||
}
|
|
||||||
|
|
||||||
private dbHandle : Mongo.Db;
|
|
||||||
private dbReady : boolean = false;
|
|
||||||
private queue : Queue.Queue<MqttMessage.MqttMessage> = new Queue.Queue<MqttMessage.MqttMessage>()
|
|
||||||
private mqttClient : Mqtt.Client
|
|
||||||
private heartbeatTimer : NodeJS.Timer
|
|
||||||
|
|
||||||
logerror(msg: string) {
|
|
||||||
logger.error(`${this.options.instanceId}: ${msg}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
logwarn(msg: string) {
|
|
||||||
logger.warn(`${this.options.instanceId}: ${msg}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
loginfo(msg: string) {
|
|
||||||
logger.info(`${this.options.instanceId}: ${msg}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
connectToDatabase() {
|
|
||||||
this.loginfo("About to connect to database")
|
|
||||||
Mongo.MongoClient.connect(this.options.mongodbUrl)
|
|
||||||
.then(
|
|
||||||
(tmpDbHandle: Mongo.Db) => {
|
|
||||||
this.dbHandle = tmpDbHandle
|
|
||||||
this.dbReady = true;
|
|
||||||
this.loginfo(`Database ${this.options.mongodbUrl} connected`)
|
|
||||||
this.dbHandle.on('reconnectFailed', (err : any) => { this.logwarn(`Error on database ${err}`) })
|
|
||||||
this.dbHandle.on('reconnect', () => {
|
|
||||||
this.loginfo("Reconnect on database")
|
|
||||||
this.dbReady = true
|
|
||||||
this.queue.knock()
|
|
||||||
})
|
|
||||||
this.dbHandle.on('timeout', () => { this.logwarn("Timeout on database") })
|
|
||||||
this.dbHandle.on('close', () => {
|
|
||||||
this.loginfo("Close on database")
|
|
||||||
this.dbReady = false
|
|
||||||
})
|
|
||||||
},
|
|
||||||
(err: String) => {
|
|
||||||
this.logerror(`Unable to connect to database: ${err}`)
|
|
||||||
this.dbReady = false
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
connectToBroker() {
|
|
||||||
this.queue.on('data', () => {
|
|
||||||
if (this.dbReady || true) {
|
|
||||||
while (! this.queue.isEmpty()) {
|
|
||||||
let msg : MqttMessage.MqttMessage = this.queue.deq()
|
|
||||||
if (this.options.verbose) {
|
|
||||||
this.loginfo(`Something in the queue`)
|
|
||||||
}
|
|
||||||
let coll = this.dbHandle.collection(this.options.collection)
|
|
||||||
coll.insertOne(msg.getMessage())
|
|
||||||
.then(
|
|
||||||
(r) => {
|
|
||||||
if (this.options.verbose) {
|
|
||||||
this.loginfo(`Successfully inserted into database ${this.options.mongodbUrl}, ${this.options.collection}: ${JSON.stringify(msg.getMessage())}`)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
(err) => {
|
|
||||||
this.logerror(`Error when trying to insert into database ${err}`)
|
|
||||||
if (! this.dbReady) {
|
|
||||||
this.loginfo("Error occured while database connection is lost, re-enqueue msg.")
|
|
||||||
this.queue.reenq(msg)
|
|
||||||
this.emit('reconnectDatabase')
|
|
||||||
} else {
|
|
||||||
this.logerror(`Message ${JSON.stringify(msg.getMessage())} is lost`)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// this.loginfo("Database currently not available, not reading from stream")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
this.mqttClient = Mqtt.connect(this.options.broker)
|
|
||||||
this.mqttClient.on('offline', () => { console.warn("MQTT client is offline") })
|
|
||||||
this.mqttClient.on('reconnect', () => { console.warn("MQTT client is reconnecting") })
|
|
||||||
this.mqttClient.on('close', () => { console.warn("MQTT connection closed") })
|
|
||||||
|
|
||||||
this.mqttClient.on('connect', () => {
|
|
||||||
this.loginfo("MQTT broker connected")
|
|
||||||
this.options.topics.forEach((topic: string) => {
|
|
||||||
this.mqttClient.subscribe(topic)
|
|
||||||
this.loginfo(`Subscribed to ${topic}`)
|
|
||||||
})
|
|
||||||
this.mqttClient.publish(`MqttMongo/Status/${this.options.instanceId}`, 'hello, started up')
|
|
||||||
})
|
|
||||||
|
|
||||||
this.mqttClient.on('message', (topic : string, messageBuf : Buffer) => {
|
|
||||||
this.msgCnt++;
|
|
||||||
let message = messageBuf.toString('UTF-8')
|
|
||||||
if (this.options.verbose) {
|
|
||||||
this.loginfo(`Message received ${this.msgCnt}, topic ${topic}, payload ${message}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
this.queue.enq(new MqttMessage.MqttMessage(topic, message, this.options.encapsulate, this.options.parsePayload))
|
|
||||||
} catch (e) {
|
|
||||||
this.logerror(`Error while parsing Mqtt message, topic '${topic}', message '${message}' , ${e.toString()}`)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
setupHeartbeat() {
|
|
||||||
this.heartbeatTimer = setInterval(() => {
|
|
||||||
let uptime : number = (new Date().getTime() - this.startTime.getTime()) / 1000
|
|
||||||
let statusMsg = `{'Uptime': ${uptime}, 'MessageCount': ${this.msgCnt}}`
|
|
||||||
this.mqttClient.publish(`MqttMongo/Status/${this.options.instanceId}`, statusMsg)
|
|
||||||
this.loginfo(`Status: ${statusMsg}`)
|
|
||||||
if (! this.dbReady) {
|
|
||||||
this.emit("reconnectDatabase")
|
|
||||||
}
|
|
||||||
}, 60000)
|
|
||||||
this.loginfo("Heartbeat timer started")
|
|
||||||
}
|
|
||||||
|
|
||||||
exec() : void {
|
|
||||||
this.connectToDatabase()
|
|
||||||
this.connectToBroker()
|
|
||||||
this.setupHeartbeat()
|
|
||||||
}
|
|
||||||
}
|
|
35
src/instance.ts
Normal file
35
src/instance.ts
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import * as Events from 'events'
|
||||||
|
import * as logger from './log'
|
||||||
|
|
||||||
|
|
||||||
|
export abstract class Instance extends Events.EventEmitter {
|
||||||
|
|
||||||
|
protected options : any
|
||||||
|
|
||||||
|
constructor(options : any) {
|
||||||
|
super()
|
||||||
|
this.options = options
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
logerror(msg: string) {
|
||||||
|
logger.error(`${this.options.instanceId}: ${msg}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
logwarn(msg: string) {
|
||||||
|
logger.warn(`${this.options.instanceId}: ${msg}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
loginfo(msg: string) {
|
||||||
|
logger.info(`${this.options.instanceId}: ${msg}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract connectToDatabase() : void
|
||||||
|
|
||||||
|
abstract connectToQueue() : void
|
||||||
|
|
||||||
|
exec() : void {
|
||||||
|
this.connectToDatabase()
|
||||||
|
this.connectToQueue()
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,6 @@
|
|||||||
import * as log from './log'
|
import * as log from './log'
|
||||||
import * as config from './config'
|
import * as config from './config'
|
||||||
import MqttMongo from './MqttMongo'
|
import { MongoBackend } from './MongoBackend'
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
45
src/queue.ts
45
src/queue.ts
@ -1,45 +0,0 @@
|
|||||||
import * as Events from 'events'
|
|
||||||
|
|
||||||
|
|
||||||
export class Queue<T> extends Events.EventEmitter {
|
|
||||||
private q : T[] = []
|
|
||||||
|
|
||||||
constructor() {
|
|
||||||
super()
|
|
||||||
}
|
|
||||||
|
|
||||||
isEmpty() : boolean {
|
|
||||||
return this.q.length == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
knock() {
|
|
||||||
this.emit('data')
|
|
||||||
}
|
|
||||||
|
|
||||||
enq(x : T) {
|
|
||||||
this.q.push(x)
|
|
||||||
this.emit('data')
|
|
||||||
}
|
|
||||||
|
|
||||||
reenq(x : T) {
|
|
||||||
this.q.unshift(x)
|
|
||||||
// this.emit('data')
|
|
||||||
}
|
|
||||||
|
|
||||||
deq() : T {
|
|
||||||
let x : T = this.peek()
|
|
||||||
this.q.shift()
|
|
||||||
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
||||||
peek() : T {
|
|
||||||
if (this.isEmpty()) {
|
|
||||||
throw new Error("queue is empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
let x : T = this.q[0]
|
|
||||||
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user