83 lines
2.9 KiB
TypeScript
83 lines
2.9 KiB
TypeScript
import * as log from './log'
|
|
import * as CallChain from './callchain'
|
|
import * as Processor from './processor'
|
|
import * as config from './config'
|
|
|
|
|
|
const CHECK_PERIOD : number = 60 // seconds
|
|
|
|
|
|
class ClientEntry {
|
|
client : string
|
|
lastEvent : number
|
|
delay : number
|
|
count : number
|
|
delaySum : number
|
|
avgDelay : number
|
|
}
|
|
|
|
class MissingEventProcessor extends Processor.AProcessor {
|
|
private timer : NodeJS.Timer
|
|
private clientMap : Map<string, ClientEntry> = new Map<string, ClientEntry>()
|
|
|
|
|
|
constructor() {
|
|
super("MissingEventProcessor")
|
|
|
|
|
|
this.timer = setInterval(() => {
|
|
this.clientMap.forEach((value : ClientEntry, key : string) : void => {
|
|
let currentTime : number = new Date().getTime()
|
|
let elapsedTime : number = currentTime - value.lastEvent
|
|
log.info(`MissingEventProcessor: Checking ${key}, elapsed: ${elapsedTime / 1000}, avg. delay: ${value.avgDelay / 1000}`)
|
|
|
|
if ((value.avgDelay != 0) && (elapsedTime > (value.avgDelay * 3))) {
|
|
log.sendAlarmMail(`Missing Event Detected: ${key}, elapsed: ${elapsedTime / 1000}, avg. delay: ${value.avgDelay / 1000}`)
|
|
}
|
|
})
|
|
}, CHECK_PERIOD * 1000)
|
|
}
|
|
|
|
protected process(message : any) : void {
|
|
let client = message.metadata.client
|
|
log.info(`MissingEventProcessor: Event for client ${client}`)
|
|
let currentTime : number = new Date().getTime()
|
|
if (this.clientMap.has(client)) {
|
|
let clientEntry = <ClientEntry>this.clientMap.get(client)
|
|
clientEntry.client = client
|
|
clientEntry.delay = currentTime - clientEntry.lastEvent
|
|
clientEntry.lastEvent = currentTime
|
|
clientEntry.count += 1
|
|
if (clientEntry.count >= 1) {
|
|
clientEntry.delaySum += clientEntry.delay
|
|
clientEntry.avgDelay = clientEntry.delaySum / clientEntry.count
|
|
}
|
|
this.clientMap.set(client, clientEntry)
|
|
log.info(`MissingEventProcessor: Entry for ${client} updated`)
|
|
} else {
|
|
let clientEntry : ClientEntry = new ClientEntry()
|
|
clientEntry.client = client
|
|
clientEntry.lastEvent = currentTime
|
|
clientEntry.delay = 0
|
|
clientEntry.count = -1
|
|
clientEntry.delaySum = 0
|
|
clientEntry.avgDelay = 0
|
|
this.clientMap.set(client, clientEntry)
|
|
log.info(`MissingEventProcessor: Entry for ${client} inserted`)
|
|
}
|
|
}
|
|
}
|
|
|
|
export class MissingEventDetector extends CallChain.ABaseChainItem {
|
|
private missingEventProcessor : MissingEventProcessor = new MissingEventProcessor()
|
|
|
|
constructor() {
|
|
super("MissingEventDetector")
|
|
}
|
|
|
|
protected func(message : any) : any {
|
|
this.missingEventProcessor.in(message)
|
|
return message
|
|
}
|
|
}
|