blob: 8b3fad6b90fd4ec304b8afa121f2ca8ee0ab3187 [file] [log] [blame]
'use strict';
var utils = require('./utils');
var EE = require('events').EventEmitter;
var Checkpointer = require('./checkpointer');
var MAX_SIMULTANEOUS_REVS = 50;
var RETRY_DEFAULT = false;
function randomNumber(min, max) {
min = parseInt(min, 10);
max = parseInt(max, 10);
if (min !== min) {
min = 0;
}
if (max !== max || max <= min) {
max = (min || 1) << 1; //doubling
} else {
max = max + 1;
}
var ratio = Math.random();
var range = max - min;
return ~~(range * ratio + min); // ~~ coerces to an int, but fast.
}
function defaultBackOff(min) {
var max = 0;
if (!min) {
max = 2000;
}
return randomNumber(min, max);
}
function backOff(repId, src, target, opts, returnValue, result, error) {
if (opts.retry === false) {
returnValue.emit('error', error);
returnValue.removeAllListeners();
return;
}
opts.default_back_off = opts.default_back_off || 0;
opts.retries = opts.retries || 0;
if (typeof opts.back_off_function !== 'function') {
opts.back_off_function = defaultBackOff;
}
opts.retries++;
if (opts.max_retries && opts.retries > opts.max_retries) {
returnValue.emit('error', new Error('tried ' +
opts.retries + ' times but replication failed'));
returnValue.removeAllListeners();
return;
}
returnValue.emit('requestError', error);
if (returnValue.state === 'active') {
returnValue.emit('paused', error);
returnValue.state = 'stopped';
returnValue.once('active', function () {
opts.current_back_off = opts.default_back_off;
});
}
opts.current_back_off = opts.current_back_off || opts.default_back_off;
opts.current_back_off = opts.back_off_function(opts.current_back_off);
setTimeout(function () {
replicate(repId, src, target, opts, returnValue);
}, opts.current_back_off);
}
// We create a basic promise so the caller can cancel the replication possibly
// before we have actually started listening to changes etc
utils.inherits(Replication, EE);
function Replication() {
EE.call(this);
this.cancelled = false;
this.state = 'pending';
var self = this;
var promise = new utils.Promise(function (fulfill, reject) {
self.once('complete', fulfill);
self.once('error', reject);
});
self.then = function (resolve, reject) {
return promise.then(resolve, reject);
};
self.catch = function (reject) {
return promise.catch(reject);
};
// As we allow error handling via "error" event as well,
// put a stub in here so that rejecting never throws UnhandledError.
self.catch(function () {});
}
Replication.prototype.cancel = function () {
this.cancelled = true;
this.state = 'cancelled';
this.emit('cancel');
};
Replication.prototype.ready = function (src, target) {
var self = this;
this.once('change', function () {
if (this.state === 'pending' || this.state === 'stopped') {
self.state = 'active';
self.emit('active');
}
});
function onDestroy() {
self.cancel();
}
src.once('destroyed', onDestroy);
target.once('destroyed', onDestroy);
function cleanup() {
src.removeListener('destroyed', onDestroy);
target.removeListener('destroyed', onDestroy);
}
this.then(cleanup, cleanup);
};
// TODO: check CouchDB's replication id generation
// Generate a unique id particular to this replication
function genReplicationId(src, target, opts) {
var filterFun = opts.filter ? opts.filter.toString() : '';
return src.id().then(function (src_id) {
return target.id().then(function (target_id) {
var queryData = src_id + target_id + filterFun +
JSON.stringify(opts.query_params) + opts.doc_ids;
return utils.MD5(queryData).then(function (md5) {
// can't use straight-up md5 alphabet, because
// the char '/' is interpreted as being for attachments,
// and + is also not url-safe
md5 = md5.replace(/\//g, '.').replace(/\+/g, '_');
return '_local/' + md5;
});
});
});
}
function replicate(repId, src, target, opts, returnValue, result) {
var batches = []; // list of batches to be processed
var currentBatch; // the batch currently being processed
var pendingBatch = {
seq: 0,
changes: [],
docs: []
}; // next batch, not yet ready to be processed
var writingCheckpoint = false; // true while checkpoint is being written
var changesCompleted = false; // true when all changes received
var replicationCompleted = false; // true when replication has completed
var last_seq = 0;
var continuous = opts.continuous || opts.live || false;
var batch_size = opts.batch_size || 100;
var batches_limit = opts.batches_limit || 10;
var changesPending = false; // true while src.changes is running
var doc_ids = opts.doc_ids;
var state = {
cancelled: false
};
var checkpointer = new Checkpointer(src, target, repId, state);
var allErrors = [];
var changedDocs = [];
result = result || {
ok: true,
start_time: new Date(),
docs_read: 0,
docs_written: 0,
doc_write_failures: 0,
errors: []
};
var changesOpts = {};
returnValue.ready(src, target);
function writeDocs() {
if (currentBatch.docs.length === 0) {
return;
}
var docs = currentBatch.docs;
return target.bulkDocs({docs: docs, new_edits: false}).then(function (res) {
if (state.cancelled) {
completeReplication();
throw new Error('cancelled');
}
var errors = [];
var errorsById = {};
res.forEach(function (res) {
if (res.error) {
result.doc_write_failures++;
errors.push(res);
errorsById[res.id] = res;
}
});
result.errors = errors;
allErrors = allErrors.concat(errors);
result.docs_written += currentBatch.docs.length - errors.length;
var non403s = errors.filter(function (error) {
return error.name !== 'unauthorized' && error.name !== 'forbidden';
});
changedDocs = [];
docs.forEach(function(doc) {
var error = errorsById[doc._id];
if (error) {
returnValue.emit('denied', utils.clone(error));
} else {
changedDocs.push(doc);
}
});
if (non403s.length > 0) {
var error = new Error('bulkDocs error');
error.other_errors = errors;
abortReplication('target.bulkDocs failed to write docs', error);
throw new Error('bulkWrite partial failure');
}
}, function (err) {
result.doc_write_failures += docs.length;
throw err;
});
}
function processDiffDoc(id) {
var diffs = currentBatch.diffs;
var allMissing = diffs[id].missing;
// avoid url too long error by batching
var missingBatches = [];
for (var i = 0; i < allMissing.length; i += MAX_SIMULTANEOUS_REVS) {
missingBatches.push(allMissing.slice(i, Math.min(allMissing.length,
i + MAX_SIMULTANEOUS_REVS)));
}
return utils.Promise.all(missingBatches.map(function (missing) {
var opts = {
revs: true,
open_revs: missing,
attachments: true
};
return src.get(id, opts).then(function (docs) {
docs.forEach(function (doc) {
if (state.cancelled) {
return completeReplication();
}
if (doc.ok) {
result.docs_read++;
currentBatch.pendingRevs++;
currentBatch.docs.push(doc.ok);
}
});
delete diffs[id];
});
}));
}
function getAllDocs() {
var diffKeys = Object.keys(currentBatch.diffs);
return utils.Promise.all(diffKeys.map(processDiffDoc));
}
function getRevisionOneDocs() {
// filter out the generation 1 docs and get them
// leaving the non-generation one docs to be got otherwise
var ids = Object.keys(currentBatch.diffs).filter(function (id) {
var missing = currentBatch.diffs[id].missing;
return missing.length === 1 && missing[0].slice(0, 2) === '1-';
});
if (!ids.length) { // nothing to fetch
return utils.Promise.resolve();
}
return src.allDocs({
keys: ids,
include_docs: true
}).then(function (res) {
if (state.cancelled) {
completeReplication();
throw (new Error('cancelled'));
}
res.rows.forEach(function (row) {
if (row.doc && !row.deleted &&
row.value.rev.slice(0, 2) === '1-' && (
!row.doc._attachments ||
Object.keys(row.doc._attachments).length === 0
)
) {
result.docs_read++;
currentBatch.pendingRevs++;
currentBatch.docs.push(row.doc);
delete currentBatch.diffs[row.id];
}
});
});
}
function getDocs() {
return getRevisionOneDocs().then(getAllDocs);
}
function finishBatch() {
writingCheckpoint = true;
return checkpointer.writeCheckpoint(currentBatch.seq).then(function () {
writingCheckpoint = false;
if (state.cancelled) {
completeReplication();
throw new Error('cancelled');
}
result.last_seq = last_seq = currentBatch.seq;
var outResult = utils.clone(result);
outResult.docs = changedDocs;
returnValue.emit('change', outResult);
currentBatch = undefined;
getChanges();
}).catch(function (err) {
writingCheckpoint = false;
abortReplication('writeCheckpoint completed with error', err);
throw err;
});
}
function getDiffs() {
var diff = {};
currentBatch.changes.forEach(function (change) {
diff[change.id] = change.changes.map(function (x) {
return x.rev;
});
});
return target.revsDiff(diff).then(function (diffs) {
if (state.cancelled) {
completeReplication();
throw new Error('cancelled');
}
// currentBatch.diffs elements are deleted as the documents are written
currentBatch.diffs = diffs;
currentBatch.pendingRevs = 0;
});
}
function startNextBatch() {
if (state.cancelled || currentBatch) {
return;
}
if (batches.length === 0) {
processPendingBatch(true);
return;
}
currentBatch = batches.shift();
getDiffs()
.then(getDocs)
.then(writeDocs)
.then(finishBatch)
.then(startNextBatch)
.catch(function (err) {
abortReplication('batch processing terminated with error', err);
});
}
function processPendingBatch(immediate) {
if (pendingBatch.changes.length === 0) {
if (batches.length === 0 && !currentBatch) {
if ((continuous && changesOpts.live) || changesCompleted) {
returnValue.emit('paused');
returnValue.emit('uptodate', result);
}
if (changesCompleted) {
completeReplication();
}
}
return;
}
if (
immediate ||
changesCompleted ||
pendingBatch.changes.length >= batch_size
) {
batches.push(pendingBatch);
pendingBatch = {
seq: 0,
changes: [],
docs: []
};
startNextBatch();
}
}
function abortReplication(reason, err) {
if (replicationCompleted) {
return;
}
if (!err.message) {
err.message = reason;
}
result.ok = false;
result.status = 'aborting';
result.errors.push(err);
allErrors = allErrors.concat(err);
batches = [];
pendingBatch = {
seq: 0,
changes: [],
docs: []
};
completeReplication();
}
function completeReplication() {
if (replicationCompleted) {
return;
}
if (state.cancelled) {
result.status = 'cancelled';
if (writingCheckpoint) {
return;
}
}
result.status = result.status || 'complete';
result.end_time = new Date();
result.last_seq = last_seq;
replicationCompleted = state.cancelled = true;
var non403s = allErrors.filter(function (error) {
return error.name !== 'unauthorized' && error.name !== 'forbidden';
});
if (non403s.length > 0) {
var error = allErrors.pop();
if (allErrors.length > 0) {
error.other_errors = allErrors;
}
error.result = result;
backOff(repId, src, target, opts, returnValue, result, error);
} else {
result.errors = allErrors;
returnValue.emit('complete', result);
returnValue.removeAllListeners();
}
}
function onChange(change) {
if (state.cancelled) {
return completeReplication();
}
var filter = utils.filterChange(opts)(change);
if (!filter) {
return;
}
if (
pendingBatch.changes.length === 0 &&
batches.length === 0 &&
!currentBatch
) {
returnValue.emit('outofdate', result);
}
pendingBatch.seq = change.seq;
pendingBatch.changes.push(change);
processPendingBatch(batches.length === 0);
}
function onChangesComplete(changes) {
changesPending = false;
if (state.cancelled) {
return completeReplication();
}
// if no results were returned then we're done,
// else fetch more
if (changes.results.length > 0) {
changesOpts.since = changes.last_seq;
getChanges();
} else {
if (continuous) {
changesOpts.live = true;
getChanges();
} else {
changesCompleted = true;
}
}
processPendingBatch(true);
}
function onChangesError(err) {
changesPending = false;
if (state.cancelled) {
return completeReplication();
}
abortReplication('changes rejected', err);
}
function getChanges() {
if (!(
!changesPending &&
!changesCompleted &&
batches.length < batches_limit
)) {
return;
}
changesPending = true;
function abortChanges() {
changes.cancel();
}
function removeListener() {
returnValue.removeListener('cancel', abortChanges);
}
returnValue.once('cancel', abortChanges);
var changes = src.changes(changesOpts)
.on('change', onChange);
changes.then(removeListener, removeListener);
changes.then(onChangesComplete)
.catch(onChangesError);
}
function startChanges() {
checkpointer.getCheckpoint().then(function (checkpoint) {
last_seq = checkpoint;
changesOpts = {
since: last_seq,
limit: batch_size,
batch_size: batch_size,
style: 'all_docs',
doc_ids: doc_ids,
returnDocs: true // required so we know when we're done
};
if (opts.filter) {
// required for the client-side filter in onChange
changesOpts.include_docs = true;
}
if (opts.query_params) {
changesOpts.query_params = opts.query_params;
}
getChanges();
}).catch(function (err) {
abortReplication('getCheckpoint rejected with ', err);
});
}
returnValue.once('cancel', completeReplication);
if (typeof opts.onChange === 'function') {
returnValue.on('change', opts.onChange);
}
if (typeof opts.complete === 'function') {
returnValue.once('error', opts.complete);
returnValue.once('complete', function (result) {
opts.complete(null, result);
});
}
if (typeof opts.since === 'undefined') {
startChanges();
} else {
writingCheckpoint = true;
checkpointer.writeCheckpoint(opts.since).then(function () {
writingCheckpoint = false;
if (state.cancelled) {
completeReplication();
return;
}
last_seq = opts.since;
startChanges();
}).catch(function (err) {
writingCheckpoint = false;
abortReplication('writeCheckpoint completed with error', err);
throw err;
});
}
}
exports.toPouch = toPouch;
function toPouch(db, opts) {
var PouchConstructor = opts.PouchConstructor;
if (typeof db === 'string') {
return new PouchConstructor(db);
} else if (db.then) {
return db;
} else {
return utils.Promise.resolve(db);
}
}
exports.replicate = replicateWrapper;
function replicateWrapper(src, target, opts, callback) {
if (typeof opts === 'function') {
callback = opts;
opts = {};
}
if (typeof opts === 'undefined') {
opts = {};
}
if (!opts.complete) {
opts.complete = callback || function () {};
}
opts = utils.clone(opts);
opts.continuous = opts.continuous || opts.live;
opts.retry = ('retry' in opts) ? opts.retry : RETRY_DEFAULT;
/*jshint validthis:true */
opts.PouchConstructor = opts.PouchConstructor || this;
var replicateRet = new Replication(opts);
toPouch(src, opts).then(function (src) {
return toPouch(target, opts).then(function (target) {
return genReplicationId(src, target, opts).then(function (repId) {
replicate(repId, src, target, opts, replicateRet);
});
});
}).catch(function (err) {
replicateRet.emit('error', err);
opts.complete(err);
});
return replicateRet;
}