| import clone from './deps/clone'; |
| import Promise from './deps/promise'; |
| import isDeleted from './deps/docs/isDeleted'; |
| import inherits from 'inherits'; |
| import getArguments from 'argsarray'; |
| import once from './deps/once'; |
| import { EventEmitter as EE } from 'events'; |
| import evalFilter from './evalFilter'; |
| import evalView from './evalView'; |
| import parseDdocFunctionName from './deps/docs/parseDdocFunctionName'; |
| import normalizeDdocFunctionName from './deps/docs/normalizeDdocFunctionName'; |
| import collectLeaves from './deps/merge/collectLeaves'; |
| import collectConflicts from './deps/merge/collectConflicts'; |
| |
| import { |
| MISSING_DOC, |
| BAD_REQUEST, |
| createError, |
| generateErrorFromResponse |
| } from './deps/errors'; |
| |
| inherits(Changes, EE); |
| |
| function Changes(db, opts, callback) { |
| EE.call(this); |
| var self = this; |
| this.db = db; |
| opts = opts ? clone(opts) : {}; |
| var complete = opts.complete = once(function (err, resp) { |
| if (err) { |
| self.emit('error', err); |
| } else { |
| self.emit('complete', resp); |
| } |
| self.removeAllListeners(); |
| db.removeListener('destroyed', onDestroy); |
| }); |
| if (callback) { |
| self.on('complete', function (resp) { |
| callback(null, resp); |
| }); |
| self.on('error', callback); |
| } |
| function onDestroy() { |
| self.cancel(); |
| } |
| db.once('destroyed', onDestroy); |
| |
| opts.onChange = function (change) { |
| /* istanbul ignore if */ |
| if (opts.isCancelled) { |
| return; |
| } |
| self.emit('change', change); |
| if (self.startSeq && self.startSeq <= change.seq) { |
| self.startSeq = false; |
| } |
| }; |
| |
| var promise = new Promise(function (fulfill, reject) { |
| opts.complete = function (err, res) { |
| if (err) { |
| reject(err); |
| } else { |
| fulfill(res); |
| } |
| }; |
| }); |
| self.once('cancel', function () { |
| db.removeListener('destroyed', onDestroy); |
| opts.complete(null, {status: 'cancelled'}); |
| }); |
| this.then = promise.then.bind(promise); |
| this['catch'] = promise['catch'].bind(promise); |
| this.then(function (result) { |
| complete(null, result); |
| }, complete); |
| |
| |
| |
| if (!db.taskqueue.isReady) { |
| db.taskqueue.addTask(function () { |
| if (self.isCancelled) { |
| self.emit('cancel'); |
| } else { |
| self.doChanges(opts); |
| } |
| }); |
| } else { |
| self.doChanges(opts); |
| } |
| } |
| Changes.prototype.cancel = function () { |
| this.isCancelled = true; |
| if (this.db.taskqueue.isReady) { |
| this.emit('cancel'); |
| } |
| }; |
| function processChange(doc, metadata, opts) { |
| var changeList = [{rev: doc._rev}]; |
| if (opts.style === 'all_docs') { |
| changeList = collectLeaves(metadata.rev_tree) |
| .map(function (x) { return {rev: x.rev}; }); |
| } |
| var change = { |
| id: metadata.id, |
| changes: changeList, |
| doc: doc |
| }; |
| |
| if (isDeleted(metadata, doc._rev)) { |
| change.deleted = true; |
| } |
| if (opts.conflicts) { |
| change.doc._conflicts = collectConflicts(metadata); |
| if (!change.doc._conflicts.length) { |
| delete change.doc._conflicts; |
| } |
| } |
| return change; |
| } |
| |
| Changes.prototype.doChanges = function (opts) { |
| var self = this; |
| var callback = opts.complete; |
| |
| opts = clone(opts); |
| if ('live' in opts && !('continuous' in opts)) { |
| opts.continuous = opts.live; |
| } |
| opts.processChange = processChange; |
| |
| if (opts.since === 'latest') { |
| opts.since = 'now'; |
| } |
| if (!opts.since) { |
| opts.since = 0; |
| } |
| if (opts.since === 'now') { |
| this.db.info().then(function (info) { |
| /* istanbul ignore if */ |
| if (self.isCancelled) { |
| callback(null, {status: 'cancelled'}); |
| return; |
| } |
| opts.since = info.update_seq; |
| self.doChanges(opts); |
| }, callback); |
| return; |
| } |
| |
| if (opts.continuous && opts.since !== 'now') { |
| this.db.info().then(function (info) { |
| self.startSeq = info.update_seq; |
| /* istanbul ignore next */ |
| }, function (err) { |
| if (err.id === 'idbNull') { |
| // db closed before this returned thats ok |
| return; |
| } |
| throw err; |
| }); |
| } |
| |
| if (opts.filter && typeof opts.filter === 'string') { |
| if (opts.filter === '_view') { |
| opts.view = normalizeDdocFunctionName(opts.view); |
| } else { |
| opts.filter = normalizeDdocFunctionName(opts.filter); |
| } |
| |
| if (this.db.type() !== 'http' && !opts.doc_ids) { |
| return this.filterChanges(opts); |
| } |
| } |
| |
| if (!('descending' in opts)) { |
| opts.descending = false; |
| } |
| |
| // 0 and 1 should return 1 document |
| opts.limit = opts.limit === 0 ? 1 : opts.limit; |
| opts.complete = callback; |
| var newPromise = this.db._changes(opts); |
| if (newPromise && typeof newPromise.cancel === 'function') { |
| var cancel = self.cancel; |
| self.cancel = getArguments(function (args) { |
| newPromise.cancel(); |
| cancel.apply(this, args); |
| }); |
| } |
| }; |
| |
| Changes.prototype.filterChanges = function (opts) { |
| var self = this; |
| var callback = opts.complete; |
| if (opts.filter === '_view') { |
| if (!opts.view || typeof opts.view !== 'string') { |
| var err = createError(BAD_REQUEST, |
| '`view` filter parameter not found or invalid.'); |
| return callback(err); |
| } |
| // fetch a view from a design doc, make it behave like a filter |
| var viewName = parseDdocFunctionName(opts.view); |
| this.db.get('_design/' + viewName[0], function (err, ddoc) { |
| /* istanbul ignore if */ |
| if (self.isCancelled) { |
| return callback(null, {status: 'cancelled'}); |
| } |
| /* istanbul ignore next */ |
| if (err) { |
| return callback(generateErrorFromResponse(err)); |
| } |
| var mapFun = ddoc && ddoc.views && ddoc.views[viewName[1]] && |
| ddoc.views[viewName[1]].map; |
| if (!mapFun) { |
| return callback(createError(MISSING_DOC, |
| (ddoc.views ? 'missing json key: ' + viewName[1] : |
| 'missing json key: views'))); |
| } |
| opts.filter = evalView(mapFun); |
| self.doChanges(opts); |
| }); |
| } else { |
| // fetch a filter from a design doc |
| var filterName = parseDdocFunctionName(opts.filter); |
| if (!filterName) { |
| return self.doChanges(opts); |
| } |
| this.db.get('_design/' + filterName[0], function (err, ddoc) { |
| /* istanbul ignore if */ |
| if (self.isCancelled) { |
| return callback(null, {status: 'cancelled'}); |
| } |
| /* istanbul ignore next */ |
| if (err) { |
| return callback(generateErrorFromResponse(err)); |
| } |
| var filterFun = ddoc && ddoc.filters && ddoc.filters[filterName[1]]; |
| if (!filterFun) { |
| return callback(createError(MISSING_DOC, |
| ((ddoc && ddoc.filters) ? 'missing json key: ' + filterName[1] |
| : 'missing json key: filters'))); |
| } |
| opts.filter = evalFilter(filterFun); |
| self.doChanges(opts); |
| }); |
| } |
| }; |
| |
| export default Changes; |