blob: 0b7346252590b7862272242245fa3c8a1e0fe9d5 [file] [log] [blame]
'use strict';
var utils = require('./utils');
var merge = require('./merge');
var errors = require('./deps/errors');
var EE = require('events').EventEmitter;
var evalFilter = require('./evalFilter');
var evalView = require('./evalView');
module.exports = Changes;
utils.inherits(Changes, EE);
function Changes(db, opts, callback) {
EE.call(this);
var self = this;
this.db = db;
opts = opts ? utils.clone(opts) : {};
var oldComplete = callback || opts.complete || function () {};
var complete = opts.complete = utils.once(function (err, resp) {
if (err) {
self.emit('error', err);
} else {
self.emit('complete', resp);
}
self.removeAllListeners();
db.removeListener('destroyed', onDestroy);
});
if (oldComplete) {
self.on('complete', function (resp) {
oldComplete(null, resp);
});
self.on('error', function (err) {
oldComplete(err);
});
}
var oldOnChange = opts.onChange;
if (oldOnChange) {
self.on('change', oldOnChange);
}
function onDestroy() {
self.cancel();
}
db.once('destroyed', onDestroy);
opts.onChange = function (change) {
if (opts.isCancelled) {
return;
}
self.emit('change', change);
if (self.startSeq && self.startSeq <= change.seq) {
self.emit('uptodate');
self.startSeq = false;
}
if (change.deleted) {
self.emit('delete', change);
} else if (change.changes.length === 1 &&
change.changes[0].rev.slice(0, 2) === '1-') {
self.emit('create', change);
} else {
self.emit('update', change);
}
};
var promise = new utils.Promise(function (fulfill, reject) {
opts.complete = function (err, res) {
if (err) {
reject(err);
} else {
fulfill(res);
}
};
});
self.once('cancel', function () {
if (oldOnChange) {
self.removeListener('change', oldOnChange);
}
opts.complete(null, {status: 'cancelled'});
});
this.then = promise.then.bind(promise);
this['catch'] = promise['catch'].bind(promise);
this.then(function (result) {
complete(null, result);
}, complete);
if (!db.taskqueue.isReady) {
db.taskqueue.addTask(function () {
if (self.isCancelled) {
self.emit('cancel');
} else {
self.doChanges(opts);
}
});
} else {
self.doChanges(opts);
}
}
Changes.prototype.cancel = function () {
this.isCancelled = true;
if (this.db.taskqueue.isReady) {
this.emit('cancel');
}
};
function processChange(doc, metadata, opts) {
var changeList = [{rev: doc._rev}];
if (opts.style === 'all_docs') {
changeList = merge.collectLeaves(metadata.rev_tree)
.map(function (x) { return {rev: x.rev}; });
}
var change = {
id: metadata.id,
changes: changeList,
doc: doc
};
if (utils.isDeleted(metadata, doc._rev)) {
change.deleted = true;
}
if (opts.conflicts) {
change.doc._conflicts = merge.collectConflicts(metadata);
if (!change.doc._conflicts.length) {
delete change.doc._conflicts;
}
}
return change;
}
Changes.prototype.doChanges = function (opts) {
var self = this;
var callback = opts.complete;
opts = utils.clone(opts);
if ('live' in opts && !('continuous' in opts)) {
opts.continuous = opts.live;
}
opts.processChange = processChange;
if (opts.since === 'latest') {
opts.since = 'now';
}
if (!opts.since) {
opts.since = 0;
}
if (opts.since === 'now') {
this.db.info().then(function (info) {
if (self.isCancelled) {
callback(null, {status: 'cancelled'});
return;
}
opts.since = info.update_seq;
self.doChanges(opts);
}, callback);
return;
}
if (opts.continuous && opts.since !== 'now') {
this.db.info().then(function (info) {
self.startSeq = info.update_seq;
}, function (err) {
if (err.id === 'idbNull') {
//db closed before this returned
//thats ok
return;
}
throw err;
});
}
if (this.db.type() !== 'http' &&
opts.filter && typeof opts.filter === 'string' &&
!opts.doc_ids) {
return this.filterChanges(opts);
}
if (!('descending' in opts)) {
opts.descending = false;
}
// 0 and 1 should return 1 document
opts.limit = opts.limit === 0 ? 1 : opts.limit;
opts.complete = callback;
var newPromise = this.db._changes(opts);
if (newPromise && typeof newPromise.cancel === 'function') {
var cancel = self.cancel;
self.cancel = utils.getArguments(function (args) {
newPromise.cancel();
cancel.apply(this, args);
});
}
};
Changes.prototype.filterChanges = function (opts) {
var self = this;
var callback = opts.complete;
if (opts.filter === '_view') {
if (!opts.view || typeof opts.view !== 'string') {
var err = errors.error(errors.BAD_REQUEST,
'`view` filter parameter is not provided.');
callback(err);
return;
}
// fetch a view from a design doc, make it behave like a filter
var viewName = opts.view.split('/');
this.db.get('_design/' + viewName[0], function (err, ddoc) {
if (self.isCancelled) {
callback(null, {status: 'cancelled'});
return;
}
if (err) {
callback(errors.generateErrorFromResponse(err));
return;
}
if (ddoc && ddoc.views && ddoc.views[viewName[1]]) {
var filter = evalView(ddoc.views[viewName[1]].map);
opts.filter = filter;
self.doChanges(opts);
return;
}
var msg = ddoc.views ? 'missing json key: ' + viewName[1] :
'missing json key: views';
if (!err) {
err = errors.error(errors.MISSING_DOC, msg);
}
callback(err);
return;
});
} else {
// fetch a filter from a design doc
var filterName = opts.filter.split('/');
this.db.get('_design/' + filterName[0], function (err, ddoc) {
if (self.isCancelled) {
callback(null, {status: 'cancelled'});
return;
}
if (err) {
callback(errors.generateErrorFromResponse(err));
return;
}
if (ddoc && ddoc.filters && ddoc.filters[filterName[1]]) {
var filter = evalFilter(ddoc.filters[filterName[1]]);
opts.filter = filter;
self.doChanges(opts);
return;
} else {
var msg = (ddoc && ddoc.filters) ? 'missing json key: ' + filterName[1]
: 'missing json key: filters';
if (!err) {
err = errors.error(errors.MISSING_DOC, msg);
}
callback(err);
return;
}
});
}
};