| 'use strict'; |
| |
| var utils = require('./utils'); |
| var EE = require('events').EventEmitter; |
| var Checkpointer = require('./checkpointer'); |
| |
| var MAX_SIMULTANEOUS_REVS = 50; |
| var RETRY_DEFAULT = false; |
| |
| function randomNumber(min, max) { |
| min = parseInt(min, 10); |
| max = parseInt(max, 10); |
| if (min !== min) { |
| min = 0; |
| } |
| if (max !== max || max <= min) { |
| max = (min || 1) << 1; //doubling |
| } else { |
| max = max + 1; |
| } |
| var ratio = Math.random(); |
| var range = max - min; |
| |
| return ~~(range * ratio + min); // ~~ coerces to an int, but fast. |
| } |
| |
| function defaultBackOff(min) { |
| var max = 0; |
| if (!min) { |
| max = 2000; |
| } |
| return randomNumber(min, max); |
| } |
| |
| function backOff(repId, src, target, opts, returnValue, result, error) { |
| if (opts.retry === false) { |
| returnValue.emit('error', error); |
| returnValue.removeAllListeners(); |
| return; |
| } |
| opts.default_back_off = opts.default_back_off || 0; |
| opts.retries = opts.retries || 0; |
| if (typeof opts.back_off_function !== 'function') { |
| opts.back_off_function = defaultBackOff; |
| } |
| opts.retries++; |
| if (opts.max_retries && opts.retries > opts.max_retries) { |
| returnValue.emit('error', new Error('tried ' + |
| opts.retries + ' times but replication failed')); |
| returnValue.removeAllListeners(); |
| return; |
| } |
| returnValue.emit('requestError', error); |
| if (returnValue.state === 'active') { |
| returnValue.emit('paused', error); |
| returnValue.state = 'stopped'; |
| returnValue.once('active', function () { |
| opts.current_back_off = opts.default_back_off; |
| }); |
| } |
| |
| opts.current_back_off = opts.current_back_off || opts.default_back_off; |
| opts.current_back_off = opts.back_off_function(opts.current_back_off); |
| setTimeout(function () { |
| replicate(repId, src, target, opts, returnValue); |
| }, opts.current_back_off); |
| } |
| |
| // We create a basic promise so the caller can cancel the replication possibly |
| // before we have actually started listening to changes etc |
| utils.inherits(Replication, EE); |
| function Replication() { |
| EE.call(this); |
| this.cancelled = false; |
| this.state = 'pending'; |
| var self = this; |
| var promise = new utils.Promise(function (fulfill, reject) { |
| self.once('complete', fulfill); |
| self.once('error', reject); |
| }); |
| self.then = function (resolve, reject) { |
| return promise.then(resolve, reject); |
| }; |
| self.catch = function (reject) { |
| return promise.catch(reject); |
| }; |
| // As we allow error handling via "error" event as well, |
| // put a stub in here so that rejecting never throws UnhandledError. |
| self.catch(function () {}); |
| } |
| |
| Replication.prototype.cancel = function () { |
| this.cancelled = true; |
| this.state = 'cancelled'; |
| this.emit('cancel'); |
| }; |
| |
| Replication.prototype.ready = function (src, target) { |
| var self = this; |
| this.once('change', function () { |
| if (this.state === 'pending' || this.state === 'stopped') { |
| self.state = 'active'; |
| self.emit('active'); |
| } |
| }); |
| function onDestroy() { |
| self.cancel(); |
| } |
| src.once('destroyed', onDestroy); |
| target.once('destroyed', onDestroy); |
| function cleanup() { |
| src.removeListener('destroyed', onDestroy); |
| target.removeListener('destroyed', onDestroy); |
| } |
| this.then(cleanup, cleanup); |
| }; |
| |
| |
| // TODO: check CouchDB's replication id generation |
| // Generate a unique id particular to this replication |
| function genReplicationId(src, target, opts) { |
| var filterFun = opts.filter ? opts.filter.toString() : ''; |
| return src.id().then(function (src_id) { |
| return target.id().then(function (target_id) { |
| var queryData = src_id + target_id + filterFun + |
| JSON.stringify(opts.query_params) + opts.doc_ids; |
| return utils.MD5(queryData).then(function (md5) { |
| // can't use straight-up md5 alphabet, because |
| // the char '/' is interpreted as being for attachments, |
| // and + is also not url-safe |
| md5 = md5.replace(/\//g, '.').replace(/\+/g, '_'); |
| return '_local/' + md5; |
| }); |
| }); |
| }); |
| } |
| |
| function replicate(repId, src, target, opts, returnValue, result) { |
| var batches = []; // list of batches to be processed |
| var currentBatch; // the batch currently being processed |
| var pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; // next batch, not yet ready to be processed |
| var writingCheckpoint = false; // true while checkpoint is being written |
| var changesCompleted = false; // true when all changes received |
| var replicationCompleted = false; // true when replication has completed |
| var last_seq = 0; |
| var continuous = opts.continuous || opts.live || false; |
| var batch_size = opts.batch_size || 100; |
| var batches_limit = opts.batches_limit || 10; |
| var changesPending = false; // true while src.changes is running |
| var doc_ids = opts.doc_ids; |
| var state = { |
| cancelled: false |
| }; |
| var checkpointer = new Checkpointer(src, target, repId, state); |
| var allErrors = []; |
| var changedDocs = []; |
| |
| result = result || { |
| ok: true, |
| start_time: new Date(), |
| docs_read: 0, |
| docs_written: 0, |
| doc_write_failures: 0, |
| errors: [] |
| }; |
| |
| var changesOpts = {}; |
| returnValue.ready(src, target); |
| |
| function writeDocs() { |
| if (currentBatch.docs.length === 0) { |
| return; |
| } |
| var docs = currentBatch.docs; |
| return target.bulkDocs({docs: docs, new_edits: false}).then(function (res) { |
| if (state.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| var errors = []; |
| var errorsById = {}; |
| res.forEach(function (res) { |
| if (res.error) { |
| result.doc_write_failures++; |
| errors.push(res); |
| errorsById[res.id] = res; |
| } |
| }); |
| result.errors = errors; |
| allErrors = allErrors.concat(errors); |
| result.docs_written += currentBatch.docs.length - errors.length; |
| var non403s = errors.filter(function (error) { |
| return error.name !== 'unauthorized' && error.name !== 'forbidden'; |
| }); |
| |
| changedDocs = []; |
| docs.forEach(function(doc) { |
| var error = errorsById[doc._id]; |
| if (error) { |
| returnValue.emit('denied', utils.clone(error)); |
| } else { |
| changedDocs.push(doc); |
| } |
| }); |
| |
| if (non403s.length > 0) { |
| var error = new Error('bulkDocs error'); |
| error.other_errors = errors; |
| abortReplication('target.bulkDocs failed to write docs', error); |
| throw new Error('bulkWrite partial failure'); |
| } |
| }, function (err) { |
| result.doc_write_failures += docs.length; |
| throw err; |
| }); |
| } |
| |
| function processDiffDoc(id) { |
| var diffs = currentBatch.diffs; |
| var allMissing = diffs[id].missing; |
| // avoid url too long error by batching |
| var missingBatches = []; |
| for (var i = 0; i < allMissing.length; i += MAX_SIMULTANEOUS_REVS) { |
| missingBatches.push(allMissing.slice(i, Math.min(allMissing.length, |
| i + MAX_SIMULTANEOUS_REVS))); |
| } |
| |
| return utils.Promise.all(missingBatches.map(function (missing) { |
| var opts = { |
| revs: true, |
| open_revs: missing, |
| attachments: true |
| }; |
| return src.get(id, opts).then(function (docs) { |
| docs.forEach(function (doc) { |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| if (doc.ok) { |
| result.docs_read++; |
| currentBatch.pendingRevs++; |
| currentBatch.docs.push(doc.ok); |
| } |
| }); |
| delete diffs[id]; |
| }); |
| })); |
| } |
| |
| function getAllDocs() { |
| var diffKeys = Object.keys(currentBatch.diffs); |
| return utils.Promise.all(diffKeys.map(processDiffDoc)); |
| } |
| |
| |
| function getRevisionOneDocs() { |
| // filter out the generation 1 docs and get them |
| // leaving the non-generation one docs to be got otherwise |
| var ids = Object.keys(currentBatch.diffs).filter(function (id) { |
| var missing = currentBatch.diffs[id].missing; |
| return missing.length === 1 && missing[0].slice(0, 2) === '1-'; |
| }); |
| if (!ids.length) { // nothing to fetch |
| return utils.Promise.resolve(); |
| } |
| return src.allDocs({ |
| keys: ids, |
| include_docs: true |
| }).then(function (res) { |
| if (state.cancelled) { |
| completeReplication(); |
| throw (new Error('cancelled')); |
| } |
| res.rows.forEach(function (row) { |
| if (row.doc && !row.deleted && |
| row.value.rev.slice(0, 2) === '1-' && ( |
| !row.doc._attachments || |
| Object.keys(row.doc._attachments).length === 0 |
| ) |
| ) { |
| result.docs_read++; |
| currentBatch.pendingRevs++; |
| currentBatch.docs.push(row.doc); |
| delete currentBatch.diffs[row.id]; |
| } |
| }); |
| }); |
| } |
| |
| function getDocs() { |
| return getRevisionOneDocs().then(getAllDocs); |
| } |
| |
| function finishBatch() { |
| writingCheckpoint = true; |
| return checkpointer.writeCheckpoint(currentBatch.seq).then(function () { |
| writingCheckpoint = false; |
| if (state.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| result.last_seq = last_seq = currentBatch.seq; |
| var outResult = utils.clone(result); |
| outResult.docs = changedDocs; |
| returnValue.emit('change', outResult); |
| currentBatch = undefined; |
| getChanges(); |
| }).catch(function (err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| }); |
| } |
| |
| function getDiffs() { |
| var diff = {}; |
| currentBatch.changes.forEach(function (change) { |
| diff[change.id] = change.changes.map(function (x) { |
| return x.rev; |
| }); |
| }); |
| return target.revsDiff(diff).then(function (diffs) { |
| if (state.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| // currentBatch.diffs elements are deleted as the documents are written |
| currentBatch.diffs = diffs; |
| currentBatch.pendingRevs = 0; |
| }); |
| } |
| |
| function startNextBatch() { |
| if (state.cancelled || currentBatch) { |
| return; |
| } |
| if (batches.length === 0) { |
| processPendingBatch(true); |
| return; |
| } |
| currentBatch = batches.shift(); |
| getDiffs() |
| .then(getDocs) |
| .then(writeDocs) |
| .then(finishBatch) |
| .then(startNextBatch) |
| .catch(function (err) { |
| abortReplication('batch processing terminated with error', err); |
| }); |
| } |
| |
| |
| function processPendingBatch(immediate) { |
| if (pendingBatch.changes.length === 0) { |
| if (batches.length === 0 && !currentBatch) { |
| if ((continuous && changesOpts.live) || changesCompleted) { |
| returnValue.emit('paused'); |
| returnValue.emit('uptodate', result); |
| } |
| if (changesCompleted) { |
| completeReplication(); |
| } |
| } |
| return; |
| } |
| if ( |
| immediate || |
| changesCompleted || |
| pendingBatch.changes.length >= batch_size |
| ) { |
| batches.push(pendingBatch); |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| startNextBatch(); |
| } |
| } |
| |
| |
| function abortReplication(reason, err) { |
| if (replicationCompleted) { |
| return; |
| } |
| if (!err.message) { |
| err.message = reason; |
| } |
| result.ok = false; |
| result.status = 'aborting'; |
| result.errors.push(err); |
| allErrors = allErrors.concat(err); |
| batches = []; |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| completeReplication(); |
| } |
| |
| |
| function completeReplication() { |
| if (replicationCompleted) { |
| return; |
| } |
| if (state.cancelled) { |
| result.status = 'cancelled'; |
| if (writingCheckpoint) { |
| return; |
| } |
| } |
| result.status = result.status || 'complete'; |
| result.end_time = new Date(); |
| result.last_seq = last_seq; |
| replicationCompleted = state.cancelled = true; |
| var non403s = allErrors.filter(function (error) { |
| return error.name !== 'unauthorized' && error.name !== 'forbidden'; |
| }); |
| if (non403s.length > 0) { |
| var error = allErrors.pop(); |
| if (allErrors.length > 0) { |
| error.other_errors = allErrors; |
| } |
| error.result = result; |
| backOff(repId, src, target, opts, returnValue, result, error); |
| } else { |
| result.errors = allErrors; |
| returnValue.emit('complete', result); |
| returnValue.removeAllListeners(); |
| } |
| } |
| |
| |
| function onChange(change) { |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| var filter = utils.filterChange(opts)(change); |
| if (!filter) { |
| return; |
| } |
| if ( |
| pendingBatch.changes.length === 0 && |
| batches.length === 0 && |
| !currentBatch |
| ) { |
| returnValue.emit('outofdate', result); |
| } |
| pendingBatch.seq = change.seq; |
| pendingBatch.changes.push(change); |
| processPendingBatch(batches.length === 0); |
| } |
| |
| |
| function onChangesComplete(changes) { |
| changesPending = false; |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| |
| // if no results were returned then we're done, |
| // else fetch more |
| if (changes.results.length > 0) { |
| changesOpts.since = changes.last_seq; |
| getChanges(); |
| } else { |
| if (continuous) { |
| changesOpts.live = true; |
| getChanges(); |
| } else { |
| changesCompleted = true; |
| } |
| } |
| processPendingBatch(true); |
| } |
| |
| |
| function onChangesError(err) { |
| changesPending = false; |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| abortReplication('changes rejected', err); |
| } |
| |
| |
| function getChanges() { |
| if (!( |
| !changesPending && |
| !changesCompleted && |
| batches.length < batches_limit |
| )) { |
| return; |
| } |
| changesPending = true; |
| function abortChanges() { |
| changes.cancel(); |
| } |
| function removeListener() { |
| returnValue.removeListener('cancel', abortChanges); |
| } |
| returnValue.once('cancel', abortChanges); |
| var changes = src.changes(changesOpts) |
| .on('change', onChange); |
| changes.then(removeListener, removeListener); |
| changes.then(onChangesComplete) |
| .catch(onChangesError); |
| } |
| |
| |
| function startChanges() { |
| checkpointer.getCheckpoint().then(function (checkpoint) { |
| last_seq = checkpoint; |
| changesOpts = { |
| since: last_seq, |
| limit: batch_size, |
| batch_size: batch_size, |
| style: 'all_docs', |
| doc_ids: doc_ids, |
| returnDocs: true // required so we know when we're done |
| }; |
| if (opts.filter) { |
| // required for the client-side filter in onChange |
| changesOpts.include_docs = true; |
| } |
| if (opts.query_params) { |
| changesOpts.query_params = opts.query_params; |
| } |
| getChanges(); |
| }).catch(function (err) { |
| abortReplication('getCheckpoint rejected with ', err); |
| }); |
| } |
| |
| |
| returnValue.once('cancel', completeReplication); |
| |
| if (typeof opts.onChange === 'function') { |
| returnValue.on('change', opts.onChange); |
| } |
| |
| if (typeof opts.complete === 'function') { |
| returnValue.once('error', opts.complete); |
| returnValue.once('complete', function (result) { |
| opts.complete(null, result); |
| }); |
| } |
| |
| if (typeof opts.since === 'undefined') { |
| startChanges(); |
| } else { |
| writingCheckpoint = true; |
| checkpointer.writeCheckpoint(opts.since).then(function () { |
| writingCheckpoint = false; |
| if (state.cancelled) { |
| completeReplication(); |
| return; |
| } |
| last_seq = opts.since; |
| startChanges(); |
| }).catch(function (err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| }); |
| } |
| } |
| |
| exports.toPouch = toPouch; |
| function toPouch(db, opts) { |
| var PouchConstructor = opts.PouchConstructor; |
| if (typeof db === 'string') { |
| return new PouchConstructor(db); |
| } else if (db.then) { |
| return db; |
| } else { |
| return utils.Promise.resolve(db); |
| } |
| } |
| |
| |
| exports.replicate = replicateWrapper; |
| function replicateWrapper(src, target, opts, callback) { |
| if (typeof opts === 'function') { |
| callback = opts; |
| opts = {}; |
| } |
| if (typeof opts === 'undefined') { |
| opts = {}; |
| } |
| if (!opts.complete) { |
| opts.complete = callback || function () {}; |
| } |
| opts = utils.clone(opts); |
| opts.continuous = opts.continuous || opts.live; |
| opts.retry = ('retry' in opts) ? opts.retry : RETRY_DEFAULT; |
| /*jshint validthis:true */ |
| opts.PouchConstructor = opts.PouchConstructor || this; |
| var replicateRet = new Replication(opts); |
| toPouch(src, opts).then(function (src) { |
| return toPouch(target, opts).then(function (target) { |
| return genReplicationId(src, target, opts).then(function (repId) { |
| replicate(repId, src, target, opts, replicateRet); |
| }); |
| }); |
| }).catch(function (err) { |
| replicateRet.emit('error', err); |
| opts.complete(err); |
| }); |
| return replicateRet; |
| } |