'use strict' /** * Module dependencies */ var xtend = require('xtend') var Readable = require('readable-stream').Readable var streamsOpts = { objectMode: true } var defaultStoreOptions = { clean: true } /** * In-memory implementation of the message store * This can actually be saved into files. * * @param {Object} [options] - store options */ function Store (options) { if (!(this instanceof Store)) { return new Store(options) } this.options = options || {} // Defaults this.options = xtend(defaultStoreOptions, options) this._inflights = {} } /** * Adds a packet to the store, a packet is * anything that has a messageId property. * */ Store.prototype.put = function (packet, cb) { this._inflights[packet.messageId] = packet if (cb) { cb() } return this } /** * Creates a stream with all the packets in the store * */ Store.prototype.createStream = function () { var stream = new Readable(streamsOpts) var inflights = this._inflights var ids = Object.keys(this._inflights) var destroyed = false var i = 0 stream._read = function () { if (!destroyed && i < ids.length) { this.push(inflights[ids[i++]]) } else { this.push(null) } } stream.destroy = function () { if (destroyed) { return } var self = this destroyed = true process.nextTick(function () { self.emit('close') }) } return stream } /** * deletes a packet from the store. */ Store.prototype.del = function (packet, cb) { packet = this._inflights[packet.messageId] if (packet) { delete this._inflights[packet.messageId] cb(null, packet) } else if (cb) { cb(new Error('missing packet')) } return this } /** * get a packet from the store. */ Store.prototype.get = function (packet, cb) { packet = this._inflights[packet.messageId] if (packet) { cb(null, packet) } else if (cb) { cb(new Error('missing packet')) } return this } /** * Close the store */ Store.prototype.close = function (cb) { if (this.options.clean) { this._inflights = null } if (cb) { cb() } } module.exports = Store