| 'use strict'; |
| var utils = require('./utils'); |
| var merge = require('./merge'); |
| var errors = require('./deps/errors'); |
| var EE = require('events').EventEmitter; |
| var evalFilter = require('./evalFilter'); |
| var evalView = require('./evalView'); |
| module.exports = Changes; |
| utils.inherits(Changes, EE); |
| |
| function Changes(db, opts, callback) { |
| EE.call(this); |
| var self = this; |
| this.db = db; |
| opts = opts ? utils.clone(opts) : {}; |
| var oldComplete = callback || opts.complete || function () {}; |
| var complete = opts.complete = utils.once(function (err, resp) { |
| if (err) { |
| self.emit('error', err); |
| } else { |
| self.emit('complete', resp); |
| } |
| self.removeAllListeners(); |
| db.removeListener('destroyed', onDestroy); |
| }); |
| if (oldComplete) { |
| self.on('complete', function (resp) { |
| oldComplete(null, resp); |
| }); |
| self.on('error', function (err) { |
| oldComplete(err); |
| }); |
| } |
| var oldOnChange = opts.onChange; |
| if (oldOnChange) { |
| self.on('change', oldOnChange); |
| } |
| function onDestroy() { |
| self.cancel(); |
| } |
| db.once('destroyed', onDestroy); |
| |
| opts.onChange = function (change) { |
| if (opts.isCancelled) { |
| return; |
| } |
| self.emit('change', change); |
| if (self.startSeq && self.startSeq <= change.seq) { |
| self.emit('uptodate'); |
| self.startSeq = false; |
| } |
| if (change.deleted) { |
| self.emit('delete', change); |
| } else if (change.changes.length === 1 && |
| change.changes[0].rev.slice(0, 2) === '1-') { |
| self.emit('create', change); |
| } else { |
| self.emit('update', change); |
| } |
| }; |
| |
| var promise = new utils.Promise(function (fulfill, reject) { |
| opts.complete = function (err, res) { |
| if (err) { |
| reject(err); |
| } else { |
| fulfill(res); |
| } |
| }; |
| }); |
| self.once('cancel', function () { |
| if (oldOnChange) { |
| self.removeListener('change', oldOnChange); |
| } |
| opts.complete(null, {status: 'cancelled'}); |
| }); |
| this.then = promise.then.bind(promise); |
| this['catch'] = promise['catch'].bind(promise); |
| this.then(function (result) { |
| complete(null, result); |
| }, complete); |
| |
| |
| |
| if (!db.taskqueue.isReady) { |
| db.taskqueue.addTask(function () { |
| if (self.isCancelled) { |
| self.emit('cancel'); |
| } else { |
| self.doChanges(opts); |
| } |
| }); |
| } else { |
| self.doChanges(opts); |
| } |
| } |
| Changes.prototype.cancel = function () { |
| this.isCancelled = true; |
| if (this.db.taskqueue.isReady) { |
| this.emit('cancel'); |
| } |
| }; |
| function processChange(doc, metadata, opts) { |
| var changeList = [{rev: doc._rev}]; |
| if (opts.style === 'all_docs') { |
| changeList = merge.collectLeaves(metadata.rev_tree) |
| .map(function (x) { return {rev: x.rev}; }); |
| } |
| var change = { |
| id: metadata.id, |
| changes: changeList, |
| doc: doc |
| }; |
| |
| if (utils.isDeleted(metadata, doc._rev)) { |
| change.deleted = true; |
| } |
| if (opts.conflicts) { |
| change.doc._conflicts = merge.collectConflicts(metadata); |
| if (!change.doc._conflicts.length) { |
| delete change.doc._conflicts; |
| } |
| } |
| return change; |
| } |
| |
| Changes.prototype.doChanges = function (opts) { |
| var self = this; |
| var callback = opts.complete; |
| |
| opts = utils.clone(opts); |
| if ('live' in opts && !('continuous' in opts)) { |
| opts.continuous = opts.live; |
| } |
| opts.processChange = processChange; |
| |
| if (opts.since === 'latest') { |
| opts.since = 'now'; |
| } |
| if (!opts.since) { |
| opts.since = 0; |
| } |
| if (opts.since === 'now') { |
| this.db.info().then(function (info) { |
| if (self.isCancelled) { |
| callback(null, {status: 'cancelled'}); |
| return; |
| } |
| opts.since = info.update_seq; |
| self.doChanges(opts); |
| }, callback); |
| return; |
| } |
| |
| if (opts.continuous && opts.since !== 'now') { |
| this.db.info().then(function (info) { |
| self.startSeq = info.update_seq; |
| }, function (err) { |
| if (err.id === 'idbNull') { |
| //db closed before this returned |
| //thats ok |
| return; |
| } |
| throw err; |
| }); |
| } |
| |
| if (this.db.type() !== 'http' && |
| opts.filter && typeof opts.filter === 'string' && |
| !opts.doc_ids) { |
| return this.filterChanges(opts); |
| } |
| |
| if (!('descending' in opts)) { |
| opts.descending = false; |
| } |
| |
| // 0 and 1 should return 1 document |
| opts.limit = opts.limit === 0 ? 1 : opts.limit; |
| opts.complete = callback; |
| var newPromise = this.db._changes(opts); |
| if (newPromise && typeof newPromise.cancel === 'function') { |
| var cancel = self.cancel; |
| self.cancel = utils.getArguments(function (args) { |
| newPromise.cancel(); |
| cancel.apply(this, args); |
| }); |
| } |
| }; |
| |
| Changes.prototype.filterChanges = function (opts) { |
| var self = this; |
| var callback = opts.complete; |
| if (opts.filter === '_view') { |
| if (!opts.view || typeof opts.view !== 'string') { |
| var err = errors.error(errors.BAD_REQUEST, |
| '`view` filter parameter is not provided.'); |
| callback(err); |
| return; |
| } |
| // fetch a view from a design doc, make it behave like a filter |
| var viewName = opts.view.split('/'); |
| this.db.get('_design/' + viewName[0], function (err, ddoc) { |
| if (self.isCancelled) { |
| callback(null, {status: 'cancelled'}); |
| return; |
| } |
| if (err) { |
| callback(errors.generateErrorFromResponse(err)); |
| return; |
| } |
| if (ddoc && ddoc.views && ddoc.views[viewName[1]]) { |
| |
| var filter = evalView(ddoc.views[viewName[1]].map); |
| opts.filter = filter; |
| self.doChanges(opts); |
| return; |
| } |
| var msg = ddoc.views ? 'missing json key: ' + viewName[1] : |
| 'missing json key: views'; |
| if (!err) { |
| err = errors.error(errors.MISSING_DOC, msg); |
| } |
| callback(err); |
| return; |
| }); |
| } else { |
| // fetch a filter from a design doc |
| var filterName = opts.filter.split('/'); |
| this.db.get('_design/' + filterName[0], function (err, ddoc) { |
| if (self.isCancelled) { |
| callback(null, {status: 'cancelled'}); |
| return; |
| } |
| if (err) { |
| callback(errors.generateErrorFromResponse(err)); |
| return; |
| } |
| if (ddoc && ddoc.filters && ddoc.filters[filterName[1]]) { |
| var filter = evalFilter(ddoc.filters[filterName[1]]); |
| opts.filter = filter; |
| self.doChanges(opts); |
| return; |
| } else { |
| var msg = (ddoc && ddoc.filters) ? 'missing json key: ' + filterName[1] |
| : 'missing json key: filters'; |
| if (!err) { |
| err = errors.error(errors.MISSING_DOC, msg); |
| } |
| callback(err); |
| return; |
| } |
| }); |
| } |
| }; |