blob: 6789710297d5e74b01408f6010267840bc73f2a4 [file] [log] [blame]
const EventEmitter = require('events').EventEmitter
const stream = require('stream')
const EVENT_BATCH = 'batch'
const EVENT_CHANGE = 'change'
const EVENT_SEQ = 'seq'
const EVENT_ERROR = 'error'
// streaming line breaker
const liner = () => {
const liner = new stream.Transform({ objectMode: true })
liner._transform = function (chunk, encoding, done) {
let data = chunk.toString('utf8')
if (this._lastLineData) {
data = this._lastLineData + data
this._lastLineData = null
const lines = data.split(/\s*\n/)
this._lastLineData = lines.splice(lines.length - 1, 1)[0]
liner._flush = function (done) {
this._lastLineData = null
return liner
// streaming change processor
const changeProcessor = (ee, batchSize) => {
const changeProcessor = new stream.Transform({ objectMode: true })
const buffer = []
changeProcessor.lastSeq = '0'
changeProcessor._transform = function (chunk, encoding, done) {
// remove last char from string
if (chunk[chunk.length - 1] === ',') {
chunk = chunk.slice(0, -1)
try {
const j = JSON.parse(chunk)
if (buffer.length >= batchSize) {
ee.emit(EVENT_BATCH, buffer.splice(0, batchSize))
} catch (e) {
// look for last_seq
const match = chunk.match(/"last_seq":(.+?)[},]/)
if (match) {
changeProcessor.lastSeq = JSON.parse(match[1])
changeProcessor._flush = function (done) {
if (buffer.length > 0) {
ee.emit(EVENT_BATCH, buffer.splice(0, buffer.length))
return changeProcessor
* Monitors the changes feed (after calling .start()/.get()) and emits events
* - EVENT_CHANGE - per change
* - EVENT_BATCH - per batch of changes
* - EVENT_SEQ - per change of sequence number
* - EVENT_ERROR - per 4xx error (except 429)
* @param {String} db - Name of the database.
* @param {Function} request - Nano.relax
class ChangesReader {
// constructor
constructor (db, request) {
this.db = db
this.request = request
// set defaults
setDefaults () { = new EventEmitter()
this.batchSize = 100
this.fastChanges = false
this.since = 'now'
this.includeDocs = false
this.timeout = 60000
this.started = false
this.wait = false
this.stopOnEmptyChanges = false // whether to stop polling if we get an empty set of changes back
this.continue = true // whether to poll again
this.qs = {} // extra querystring parameters
this.selector = null
this.paused = false
pause () {
this.paused = true
resume () {
this.paused = false
// prevent another poll happening
stop () {
this.continue = false
// sleep, promise style
async sleep (t) {
return new Promise((resolve, reject) => {
setTimeout(resolve, t)
// called to start listening to the changes feed. The opts object can contain:
// - batchSize - the number of records to return per HTTP request
// - since - the the sequence token to start from (defaults to 'now')
start (opts) {
const self = this
// if we're already listening for changes
if (self.started) {
// return the existing event emitter
self.started = true
// handle overidden defaults
opts = opts || {}
Object.assign(self, opts)
// the work function is async and runs in the background
// with a big do/while loop
let delay = 0
const work = async () => {
do {
if (!self.paused) {
// formulate changes feed longpoll HTTP request
const req = {
method: 'post',
db: self.db,
path: '_changes',
qs: {
feed: 'longpoll',
timeout: self.timeout,
since: self.since,
limit: self.batchSize,
include_docs: self.includeDocs
body: {}
if (self.fastChanges) {
req.qs.seq_interval = self.batchSize
if (self.selector) {
req.qs.filter = '_selector'
req.body.selector = self.selector
Object.assign(req.qs, opts.qs)
// make HTTP request to get up to batchSize changes from the feed
try {
const data = await self.request(req)
delay = 0
// update the since state
if (data && data.last_seq && data.last_seq !== self.since) {
self.since = data.last_seq, self.since)
// stop on empty batch
if (self.stopOnEmptyChanges && data && typeof data.results !== 'undefined' && data.results.length === 0) {
self.continue = false
// if we have data
if (data && data.results && data.results.length > 0) {
// emit EVENT_CHANGE events
for (const i in data.results) {, data.results[i])
// in 'wait' mode, we need to wait until the user calls .resume()
if (self.wait) {
// so call pause() to prevent further API calls until .resume() is called
// emit EVENT_BATCH event, data.results)
} catch (err) {
// error (wrong password, bad since value etc)
// if the error is fatal
if (err && err.statusCode && err.statusCode >= 400 && err.statusCode !== 429 && err.statusCode < 500) {
self.continue = false
} else {
// don't immediately retry on error - exponential back-off
delay = delay ? Math.max(60000, delay * 2) : 5000
}, err)
// if in pause mode, wait 100ms before checking again
if (self.paused && delay === 0) {
delay = 100
// delay before next request?
if (self.continue && delay > 0) {
await self.sleep(delay)
} while (self.continue)
// reset'end', self.since)
// return the event emitter to the caller
// called to start listening to the changes feed for a finite number of changes. The opts object can contain:
// - batchSize - the number of records to return per HTTP request
// - since - the sequence token to start from (defaults to 'now')
get (opts) {
this.stopOnEmptyChanges = true
return this.start(opts)
// called to spool through changes to "now" in one long HTTP request
spool (opts) {
const self = this
opts = opts || {}
Object.assign(self, opts)
const req = {
method: 'post',
db: self.db,
path: '_changes',
qs: {
since: self.since,
include_docs: self.includeDocs,
seq_interval: self.batchSize
stream: true,
body: {}
if (self.selector) {
req.qs.filter = '_selector'
req.body.selector = self.selector
const lin = liner()
const cp = changeProcessor(, self.batchSize)
.on('finish', (lastSeq) => {
// the 'end' event was triggering before the last data event
setTimeout(() => {'end', cp.lastSeq)
}, 10)
.on(EVENT_ERROR, (e) => {, e)
module.exports = ChangesReader