blob: cf2a7c607ee2cb8204e481cc6178ca4419bb3fb2 [file] [log] [blame]
const EventEmitter = require('events').EventEmitter
const stream = require('stream')
const EVENT_BATCH = 'batch'
const EVENT_CHANGE = 'change'
const EVENT_SEQ = 'seq'
const EVENT_ERROR = 'error'
// streaming line breaker
const liner = () => {
const liner = new stream.Transform({ objectMode: true })
liner._transform = function (chunk, encoding, done) {
let data = chunk.toString('utf8')
if (this._lastLineData) {
data = this._lastLineData + data
this._lastLineData = null
}
const lines = data.split(/\s*\n/)
this._lastLineData = lines.splice(lines.length - 1, 1)[0]
lines.forEach(this.push.bind(this))
done()
}
liner._flush = function (done) {
this.push(this._lastLineData)
this._lastLineData = null
done()
}
return liner
}
// streaming change processor
const changeProcessor = (ee, batchSize) => {
const changeProcessor = new stream.Transform({ objectMode: true })
const buffer = []
changeProcessor.lastSeq = '0'
changeProcessor._transform = function (chunk, encoding, done) {
// remove last char from string
if (chunk[chunk.length - 1] === ',') {
chunk = chunk.slice(0, -1)
}
try {
const j = JSON.parse(chunk)
buffer.push(j)
if (buffer.length >= batchSize) {
ee.emit(EVENT_BATCH, buffer.splice(0, batchSize))
}
done()
} catch (e) {
// look for last_seq
const match = chunk.match(/"last_seq":(.+?)[},]/)
if (match) {
changeProcessor.lastSeq = JSON.parse(match[1])
}
done()
}
}
changeProcessor._flush = function (done) {
if (buffer.length > 0) {
ee.emit(EVENT_BATCH, buffer.splice(0, buffer.length))
}
done()
}
return changeProcessor
}
/**
* Monitors the changes feed (after calling .start()/.get()) and emits events
* - EVENT_CHANGE - per change
* - EVENT_BATCH - per batch of changes
* - EVENT_SEQ - per change of sequence number
* - EVENT_ERROR - per 4xx error (except 429)
*
* @param {String} db - Name of the database.
* @param {Function} request - Nano.relax
*/
class ChangesReader {
// constructor
constructor (db, request) {
this.db = db
this.setDefaults()
this.request = request
}
// set defaults
setDefaults () {
this.ee = new EventEmitter()
this.batchSize = 100
this.fastChanges = false
this.since = 'now'
this.includeDocs = false
this.timeout = 60000
this.started = false
this.wait = false
this.stopOnEmptyChanges = false // whether to stop polling if we get an empty set of changes back
this.continue = true // whether to poll again
this.qs = {} // extra querystring parameters
this.selector = null
this.paused = false
}
pause () {
this.paused = true
}
resume () {
this.paused = false
}
// prevent another poll happening
stop () {
this.continue = false
}
// sleep, promise style
async sleep (t) {
return new Promise((resolve, reject) => {
setTimeout(resolve, t)
})
}
// called to start listening to the changes feed. The opts object can contain:
// - batchSize - the number of records to return per HTTP request
// - since - the the sequence token to start from (defaults to 'now')
start (opts) {
const self = this
// if we're already listening for changes
if (self.started) {
// return the existing event emitter
return self.ee
}
self.started = true
// handle overidden defaults
opts = opts || {}
Object.assign(self, opts)
// the work function is async and runs in the background
// with a big do/while loop
let delay = 0
const work = async () => {
do {
if (!self.paused) {
// formulate changes feed longpoll HTTP request
const req = {
method: 'post',
db: self.db,
path: '_changes',
qs: {
feed: 'longpoll',
timeout: self.timeout,
since: self.since,
limit: self.batchSize,
include_docs: self.includeDocs
},
body: {}
}
if (self.fastChanges) {
req.qs.seq_interval = self.batchSize
}
if (self.selector) {
req.qs.filter = '_selector'
req.body.selector = self.selector
}
Object.assign(req.qs, opts.qs)
// make HTTP request to get up to batchSize changes from the feed
try {
const data = await self.request(req)
delay = 0
// update the since state
if (data && data.last_seq && data.last_seq !== self.since) {
self.since = data.last_seq
self.ee.emit(EVENT_SEQ, self.since)
}
// stop on empty batch
if (self.stopOnEmptyChanges && data && typeof data.results !== 'undefined' && data.results.length === 0) {
self.continue = false
}
// if we have data
if (data && data.results && data.results.length > 0) {
// emit EVENT_CHANGE events
for (const i in data.results) {
self.ee.emit(EVENT_CHANGE, data.results[i])
}
// in 'wait' mode, we need to wait until the user calls .resume()
if (self.wait) {
// so call pause() to prevent further API calls until .resume() is called
self.pause()
}
// emit EVENT_BATCH event
self.ee.emit(EVENT_BATCH, data.results)
}
} catch (err) {
// error (wrong password, bad since value etc)
// if the error is fatal
if (err && err.statusCode && err.statusCode >= 400 && err.statusCode !== 429 && err.statusCode < 500) {
self.continue = false
} else {
// don't immediately retry on error - exponential back-off
delay = delay ? Math.min(60000, delay * 2) : 5000 // up to and no more than one minute
}
self.ee.emit(EVENT_ERROR, err)
}
}
// if in pause mode, wait 100ms before checking again
if (self.paused && delay === 0) {
delay = 100
}
// delay before next request?
if (self.continue && delay > 0) {
await self.sleep(delay)
}
} while (self.continue)
// reset
self.ee.emit('end', self.since)
self.setDefaults()
}
work()
// return the event emitter to the caller
return self.ee
}
// called to start listening to the changes feed for a finite number of changes. The opts object can contain:
// - batchSize - the number of records to return per HTTP request
// - since - the sequence token to start from (defaults to 'now')
get (opts) {
this.stopOnEmptyChanges = true
return this.start(opts)
}
// called to spool through changes to "now" in one long HTTP request
spool (opts) {
const self = this
self.setDefaults()
opts = opts || {}
Object.assign(self, opts)
const req = {
method: 'post',
db: self.db,
path: '_changes',
qs: {
since: self.since,
include_docs: self.includeDocs,
seq_interval: self.batchSize
},
stream: true,
body: {}
}
if (self.selector) {
req.qs.filter = '_selector'
req.body.selector = self.selector
}
const lin = liner()
const cp = changeProcessor(self.ee, self.batchSize)
self.request(req)
.pipe(lin)
.pipe(cp)
.on('finish', (lastSeq) => {
// the 'end' event was triggering before the last data event
setTimeout(() => {
self.ee.emit('end', cp.lastSeq)
}, 10)
})
.on(EVENT_ERROR, (e) => {
self.ee.emit(EVENT_ERROR, e)
})
return self.ee
}
}
module.exports = ChangesReader