blob: 605a5ab5ec522c3fa80cd3d7402eee95e9d26f21 [file] [log] [blame]
'use strict';
var utils = require('./utils');
var Pouch = require('./index');
var EE = require('events').EventEmitter;
// We create a basic promise so the caller can cancel the replication possibly
// before we have actually started listening to changes etc
utils.inherits(Replication, EE);
function Replication(opts) {
EE.call(this);
this.cancelled = false;
var self = this;
var promise = new utils.Promise(function (fulfill, reject) {
self.once('complete', fulfill);
self.once('error', reject);
});
self.then = function (resolve, reject) {
return promise.then(resolve, reject);
};
self.catch = function (reject) {
return promise.catch(reject);
};
// As we allow error handling via "error" event as well,
// put a stub in here so that rejecting never throws UnhandledError.
self.catch(function (err) {});
}
Replication.prototype.cancel = function () {
this.cancelled = true;
this.emit('cancel');
};
Replication.prototype.ready = function (src, target) {
var self = this;
function onDestroy() {
self.cancel();
}
src.once('destroyed', onDestroy);
target.once('destroyed', onDestroy);
function cleanup() {
self.removeAllListeners();
src.removeListener('destroyed', onDestroy);
target.removeListener('destroyed', onDestroy);
}
this.then(cleanup, cleanup);
};
// TODO: check CouchDB's replication id generation
// Generate a unique id particular to this replication
function genReplicationId(src, target, opts) {
var filterFun = opts.filter ? opts.filter.toString() : '';
return src.id().then(function (src_id) {
return target.id().then(function (target_id) {
var queryData = src_id + target_id + filterFun +
JSON.stringify(opts.query_params) + opts.doc_ids;
return utils.MD5(queryData).then(function (md5) {
return '_local/' + md5;
});
});
});
}
function updateCheckpoint(db, id, checkpoint) {
return db.get(id).catch(function (err) {
if (err.status === 404) {
return {_id: id};
}
throw err;
}).then(function (doc) {
doc.last_seq = checkpoint;
return db.put(doc);
});
}
function Checkpointer(src, target, id) {
this.src = src;
this.target = target;
this.id = id;
}
Checkpointer.prototype.writeCheckpoint = function (checkpoint) {
var self = this;
return this.updateTarget(checkpoint).then(function () {
return self.updateSource(checkpoint);
});
};
Checkpointer.prototype.updateTarget = function (checkpoint) {
return updateCheckpoint(this.target, this.id, checkpoint);
};
Checkpointer.prototype.updateSource = function (checkpoint) {
var self = this;
if (this.readOnlySource) {
return utils.Promise.resolve(true);
}
return updateCheckpoint(this.src, this.id, checkpoint).catch(function (err) {
var isForbidden = typeof err.status === 'number' &&
Math.floor(err.status / 100) === 4;
if (isForbidden) {
self.readOnlySource = true;
return true;
}
throw err;
});
};
Checkpointer.prototype.getCheckpoint = function () {
var self = this;
return self.target.get(self.id).then(function (targetDoc) {
return self.src.get(self.id).then(function (sourceDoc) {
if (targetDoc.last_seq === sourceDoc.last_seq) {
return sourceDoc.last_seq;
}
return 0;
}, function (err) {
if (err.status === 404 && targetDoc.last_seq) {
return self.src.put({
_id: self.id,
last_seq: 0
}).then(function () {
return 0;
}, function (err) {
if (err.status === 401) {
self.readOnlySource = true;
return targetDoc.last_seq;
}
return 0;
});
}
throw err;
});
}).catch(function (err) {
if (err.status !== 404) {
throw err;
}
return 0;
});
};
function replicate(repId, src, target, opts, returnValue) {
var batches = []; // list of batches to be processed
var currentBatch; // the batch currently being processed
var pendingBatch = {
seq: 0,
changes: [],
docs: []
}; // next batch, not yet ready to be processed
var writingCheckpoint = false; // true while checkpoint is being written
var changesCompleted = false; // true when all changes received
var replicationCompleted = false; // true when replication has completed
var last_seq = 0;
var continuous = opts.continuous || opts.live || false;
var batch_size = opts.batch_size || 100;
var batches_limit = opts.batches_limit || 10;
var changesPending = false; // true while src.changes is running
var changesCount = 0; // number of changes received since calling src.changes
var doc_ids = opts.doc_ids;
var checkpointer = new Checkpointer(src, target, repId);
var result = {
ok: true,
start_time: new Date(),
docs_read: 0,
docs_written: 0,
doc_write_failures: 0,
errors: []
};
var changesOpts = {};
returnValue.ready(src, target);
function writeDocs() {
if (currentBatch.docs.length === 0) {
return;
}
var docs = currentBatch.docs;
return target.bulkDocs({
docs: docs
}, {
new_edits: false
}).then(function (res) {
if (returnValue.cancelled) {
completeReplication();
throw new Error('cancelled');
}
var errors = [];
res.forEach(function (res) {
if (!res.ok) {
result.doc_write_failures++;
errors.push(new Error(res.reason || res.message || 'Unknown reason'));
}
});
if (errors.length > 0) {
var error = new Error('bulkDocs error');
error.other_errors = errors;
abortReplication('target.bulkDocs failed to write docs', error);
throw new Error('bulkWrite partial failure');
}
}, function (err) {
result.doc_write_failures += docs.length;
throw err;
});
}
function getDocs() {
var docIds = Object.keys(currentBatch.diffs);
if (!docIds.length) {
return utils.Promise.resolve();
}
var idsToDocs = new utils.Map();
currentBatch.changes.forEach(function (change) {
idsToDocs.set(change.id, change.doc);
});
return utils.Promise.all(docIds.map(function (docId) {
return mapToDocWithOpenRevs(docId, idsToDocs.get(docId));
})).then(function (docLists) {
docLists.forEach(function (docs) {
docs.forEach(processSourceDoc);
});
});
}
function mapToDocWithOpenRevs(docId, doc) {
// as an optimization, we optimistically try to fetch all the open
// revs using just the docs returned from changes()
// if that's not possible, we do separate GETs for the open_revs
if (doc._deleted) {
return [{ok: doc}];
}
var diffs = currentBatch.diffs;
var missing = diffs[docId].missing;
var needAttachments = doc._attachments;
var needOtherRevs = missing.length > 1 || missing[0] !== doc._rev;
if (needAttachments || needOtherRevs) {
// fetch individually (slower)
return src.get(docId, {
revs: true,
open_revs: "all", // avoid url too long error, don't send revs array
attachments: true
}).then(function (results) {
var missingMap = {};
missing.forEach(function (missingRev) {
missingMap[missingRev] = true;
});
return results.filter(function (res) {
var rev = res.ok ? res.ok._rev : res.missing;
return missingMap[rev];
});
});
}
return [{ok: doc}];
}
function processSourceDoc(doc) {
var diffs = currentBatch.diffs;
if (returnValue.cancelled) {
return completeReplication();
}
if (doc.ok) {
result.docs_read++;
currentBatch.pendingRevs++;
currentBatch.docs.push(doc.ok);
delete diffs[doc.ok._id];
}
}
function finishBatch() {
writingCheckpoint = true;
return checkpointer.writeCheckpoint(
currentBatch.seq
).then(function (res) {
writingCheckpoint = false;
if (returnValue.cancelled) {
completeReplication();
throw new Error('cancelled');
}
result.last_seq = last_seq = currentBatch.seq;
result.docs_written += currentBatch.docs.length;
returnValue.emit('change', utils.clone(result));
currentBatch = undefined;
getChanges();
}).catch(function (err) {
writingCheckpoint = false;
abortReplication('writeCheckpoint completed with error', err);
throw err;
});
}
function getDiffs() {
var diff = {};
currentBatch.changes.forEach(function (change) {
diff[change.id] = change.changes.map(function (x) {
return x.rev;
});
});
return target.revsDiff(diff).then(function (diffs) {
if (returnValue.cancelled) {
completeReplication();
throw new Error('cancelled');
}
// currentBatch.diffs elements are deleted as the documents are written
currentBatch.diffs = diffs;
currentBatch.pendingRevs = 0;
});
}
function startNextBatch() {
if (returnValue.cancelled || currentBatch) {
return;
}
if (batches.length === 0) {
processPendingBatch(true);
return;
}
currentBatch = batches.shift();
getDiffs()
.then(getDocs)
.then(writeDocs)
.then(finishBatch)
.then(startNextBatch)
.catch(function (err) {
abortReplication('batch processing terminated with error', err);
});
}
function processPendingBatch(immediate) {
if (pendingBatch.changes.length === 0) {
if (batches.length === 0 && !currentBatch) {
if ((continuous && changesOpts.live) || changesCompleted) {
returnValue.emit('uptodate', utils.clone(result));
}
if (changesCompleted) {
completeReplication();
}
}
return;
}
if (
immediate ||
changesCompleted ||
pendingBatch.changes.length >= batch_size
) {
batches.push(pendingBatch);
pendingBatch = {
seq: 0,
changes: [],
docs: []
};
startNextBatch();
}
}
function abortReplication(reason, err) {
if (replicationCompleted) {
return;
}
result.ok = false;
result.status = 'aborted';
result.errors.push(err);
batches = [];
pendingBatch = {
seq: 0,
changes: [],
docs: []
};
completeReplication();
}
function completeReplication() {
if (replicationCompleted) {
return;
}
if (returnValue.cancelled) {
result.status = 'cancelled';
if (writingCheckpoint) {
return;
}
}
result.status = result.status || 'complete';
result.end_time = new Date();
result.last_seq = last_seq;
replicationCompleted = returnValue.cancelled = true;
if (result.errors.length > 0) {
var error = result.errors.pop();
if (result.errors.length > 0) {
error.other_errors = result.errors;
}
error.result = result;
returnValue.emit('error', error);
} else {
returnValue.emit('complete', result);
}
returnValue.removeAllListeners();
}
function onChange(change) {
if (returnValue.cancelled) {
return completeReplication();
}
changesCount++;
if (
pendingBatch.changes.length === 0 &&
batches.length === 0 &&
!currentBatch
) {
returnValue.emit('outofdate', utils.clone(result));
}
pendingBatch.seq = change.seq;
pendingBatch.changes.push(change);
processPendingBatch(batches.length === 0);
}
function onChangesComplete(changes) {
changesPending = false;
if (returnValue.cancelled) {
return completeReplication();
}
if (changesCount > 0) {
changesOpts.since = changes.last_seq;
getChanges();
} else {
if (continuous) {
changesOpts.live = true;
getChanges();
} else {
changesCompleted = true;
}
}
processPendingBatch(true);
}
function onChangesError(err) {
changesPending = false;
if (returnValue.cancelled) {
return completeReplication();
}
abortReplication('changes rejected', err);
}
function getChanges() {
if (
!changesPending &&
!changesCompleted &&
batches.length < batches_limit
) {
changesPending = true;
changesCount = 0;
src.changes(changesOpts)
.on('change', onChange)
.then(onChangesComplete)
.catch(onChangesError);
}
}
function startChanges() {
checkpointer.getCheckpoint().then(function (checkpoint) {
last_seq = checkpoint;
changesOpts = {
since: last_seq,
limit: batch_size,
batch_size: batch_size,
style: 'all_docs',
doc_ids: doc_ids,
returnDocs: false,
include_docs: true
};
if (opts.filter) {
changesOpts.filter = opts.filter;
}
if (opts.query_params) {
changesOpts.query_params = opts.query_params;
}
getChanges();
}).catch(function (err) {
abortReplication('getCheckpoint rejected with ', err);
});
}
returnValue.once('cancel', completeReplication);
if (typeof opts.onChange === 'function') {
returnValue.on('change', opts.onChange);
}
if (typeof opts.complete === 'function') {
returnValue.once('error', opts.complete);
returnValue.once('complete', function (result) {
opts.complete(null, result);
});
}
if (typeof opts.since === 'undefined') {
startChanges();
} else {
writingCheckpoint = true;
checkpointer.writeCheckpoint(opts.since).then(function (res) {
writingCheckpoint = false;
if (returnValue.cancelled) {
completeReplication();
return;
}
last_seq = opts.since;
startChanges();
}).catch(function (err) {
writingCheckpoint = false;
abortReplication('writeCheckpoint completed with error', err);
throw err;
});
}
}
function toPouch(db) {
if (typeof db === 'string') {
return new Pouch(db);
} else if (db.then) {
return db;
} else {
return utils.Promise.resolve(db);
}
}
function replicateWrapper(src, target, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
if (typeof opts === 'undefined') {
opts = {};
}
if (!opts.complete) {
opts.complete = callback || function () {};
}
opts = utils.clone(opts);
opts.continuous = opts.continuous || opts.live;
var replicateRet = new Replication(opts);
toPouch(src).then(function (src) {
return toPouch(target).then(function (target) {
return genReplicationId(src, target, opts).then(function (repId) {
replicate(repId, src, target, opts, replicateRet);
});
});
}).catch(function (err) {
replicateRet.emit('error', err);
opts.complete(err);
});
return replicateRet;
}
exports.replicate = replicateWrapper;