| import clone from '../deps/clone'; |
| import Promise from '../deps/promise'; |
| import uuid from '../deps/uuid'; |
| import filterChange from '../deps/filterChange'; |
| import Checkpointer from './checkpointer'; |
| import backOff from './backoff'; |
| import generateReplicationId from './generateReplicationId'; |
| import getDocs from './getDocs'; |
| |
| function replicate(src, target, opts, returnValue, result) { |
| var batches = []; // list of batches to be processed |
| var currentBatch; // the batch currently being processed |
| var pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; // next batch, not yet ready to be processed |
| var writingCheckpoint = false; // true while checkpoint is being written |
| var changesCompleted = false; // true when all changes received |
| var replicationCompleted = false; // true when replication has completed |
| var last_seq = 0; |
| var continuous = opts.continuous || opts.live || false; |
| var batch_size = opts.batch_size || 100; |
| var batches_limit = opts.batches_limit || 10; |
| var changesPending = false; // true while src.changes is running |
| var doc_ids = opts.doc_ids; |
| var repId; |
| var checkpointer; |
| var allErrors = []; |
| var changedDocs = []; |
| // Like couchdb, every replication gets a unique session id |
| var session = uuid(); |
| |
| result = result || { |
| ok: true, |
| start_time: new Date(), |
| docs_read: 0, |
| docs_written: 0, |
| doc_write_failures: 0, |
| errors: [] |
| }; |
| |
| var changesOpts = {}; |
| returnValue.ready(src, target); |
| |
| function initCheckpointer() { |
| if (checkpointer) { |
| return Promise.resolve(); |
| } |
| return generateReplicationId(src, target, opts).then(function (res) { |
| repId = res; |
| checkpointer = new Checkpointer(src, target, repId, returnValue); |
| }); |
| } |
| |
| function writeDocs() { |
| changedDocs = []; |
| |
| if (currentBatch.docs.length === 0) { |
| return; |
| } |
| var docs = currentBatch.docs; |
| return target.bulkDocs({docs: docs, new_edits: false}).then(function (res) { |
| if (returnValue.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| var errors = []; |
| var errorsById = {}; |
| res.forEach(function (res) { |
| if (res.error) { |
| result.doc_write_failures++; |
| errors.push(res); |
| errorsById[res.id] = res; |
| } |
| }); |
| allErrors = allErrors.concat(errors); |
| result.docs_written += currentBatch.docs.length - errors.length; |
| var non403s = errors.filter(function (error) { |
| return error.name !== 'unauthorized' && error.name !== 'forbidden'; |
| }); |
| |
| docs.forEach(function(doc) { |
| var error = errorsById[doc._id]; |
| if (error) { |
| returnValue.emit('denied', clone(error)); |
| } else { |
| changedDocs.push(doc); |
| } |
| }); |
| |
| if (non403s.length > 0) { |
| var error = new Error('bulkDocs error'); |
| error.other_errors = errors; |
| abortReplication('target.bulkDocs failed to write docs', error); |
| throw new Error('bulkWrite partial failure'); |
| } |
| }, function (err) { |
| result.doc_write_failures += docs.length; |
| throw err; |
| }); |
| } |
| |
| function finishBatch() { |
| result.last_seq = last_seq = currentBatch.seq; |
| var outResult = clone(result); |
| if (changedDocs.length) { |
| outResult.docs = changedDocs; |
| returnValue.emit('change', outResult); |
| } |
| writingCheckpoint = true; |
| return checkpointer.writeCheckpoint(currentBatch.seq, |
| session).then(function () { |
| writingCheckpoint = false; |
| if (returnValue.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| currentBatch = undefined; |
| getChanges(); |
| }).catch(function (err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| }); |
| } |
| |
| function getDiffs() { |
| var diff = {}; |
| currentBatch.changes.forEach(function (change) { |
| // Couchbase Sync Gateway emits these, but we can ignore them |
| /* istanbul ignore if */ |
| if (change.id === "_user/") { |
| return; |
| } |
| diff[change.id] = change.changes.map(function (x) { |
| return x.rev; |
| }); |
| }); |
| return target.revsDiff(diff).then(function (diffs) { |
| if (returnValue.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| // currentBatch.diffs elements are deleted as the documents are written |
| currentBatch.diffs = diffs; |
| }); |
| } |
| |
| function getBatchDocs() { |
| return getDocs(src, currentBatch.diffs, returnValue).then(function (docs) { |
| docs.forEach(function (doc) { |
| delete currentBatch.diffs[doc._id]; |
| result.docs_read++; |
| currentBatch.docs.push(doc); |
| }); |
| }); |
| } |
| |
| function startNextBatch() { |
| if (returnValue.cancelled || currentBatch) { |
| return; |
| } |
| if (batches.length === 0) { |
| processPendingBatch(true); |
| return; |
| } |
| currentBatch = batches.shift(); |
| getDiffs() |
| .then(getBatchDocs) |
| .then(writeDocs) |
| .then(finishBatch) |
| .then(startNextBatch) |
| .catch(function (err) { |
| abortReplication('batch processing terminated with error', err); |
| }); |
| } |
| |
| |
| function processPendingBatch(immediate) { |
| if (pendingBatch.changes.length === 0) { |
| if (batches.length === 0 && !currentBatch) { |
| if ((continuous && changesOpts.live) || changesCompleted) { |
| returnValue.state = 'pending'; |
| returnValue.emit('paused'); |
| } |
| if (changesCompleted) { |
| completeReplication(); |
| } |
| } |
| return; |
| } |
| if ( |
| immediate || |
| changesCompleted || |
| pendingBatch.changes.length >= batch_size |
| ) { |
| batches.push(pendingBatch); |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| if (returnValue.state === 'pending' || returnValue.state === 'stopped') { |
| returnValue.state = 'active'; |
| returnValue.emit('active'); |
| } |
| startNextBatch(); |
| } |
| } |
| |
| |
| function abortReplication(reason, err) { |
| if (replicationCompleted) { |
| return; |
| } |
| if (!err.message) { |
| err.message = reason; |
| } |
| result.ok = false; |
| result.status = 'aborting'; |
| result.errors.push(err); |
| allErrors = allErrors.concat(err); |
| batches = []; |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| completeReplication(); |
| } |
| |
| |
| function completeReplication() { |
| if (replicationCompleted) { |
| return; |
| } |
| if (returnValue.cancelled) { |
| result.status = 'cancelled'; |
| if (writingCheckpoint) { |
| return; |
| } |
| } |
| result.status = result.status || 'complete'; |
| result.end_time = new Date(); |
| result.last_seq = last_seq; |
| replicationCompleted = true; |
| var non403s = allErrors.filter(function (error) { |
| return error.name !== 'unauthorized' && error.name !== 'forbidden'; |
| }); |
| if (non403s.length > 0) { |
| var error = allErrors.pop(); |
| if (allErrors.length > 0) { |
| error.other_errors = allErrors; |
| } |
| error.result = result; |
| backOff(opts, returnValue, error, function () { |
| replicate(src, target, opts, returnValue); |
| }); |
| } else { |
| result.errors = allErrors; |
| returnValue.emit('complete', result); |
| returnValue.removeAllListeners(); |
| } |
| } |
| |
| |
| function onChange(change) { |
| if (returnValue.cancelled) { |
| return completeReplication(); |
| } |
| var filter = filterChange(opts)(change); |
| if (!filter) { |
| return; |
| } |
| pendingBatch.seq = change.seq; |
| pendingBatch.changes.push(change); |
| processPendingBatch(changesOpts.live); |
| } |
| |
| |
| function onChangesComplete(changes) { |
| changesPending = false; |
| if (returnValue.cancelled) { |
| return completeReplication(); |
| } |
| |
| // if no results were returned then we're done, |
| // else fetch more |
| if (changes.results.length > 0) { |
| changesOpts.since = changes.last_seq; |
| getChanges(); |
| } else { |
| if (continuous) { |
| changesOpts.live = true; |
| getChanges(); |
| } else { |
| changesCompleted = true; |
| } |
| } |
| processPendingBatch(true); |
| } |
| |
| |
| function onChangesError(err) { |
| changesPending = false; |
| /* istanbul ignore if */ |
| if (returnValue.cancelled) { |
| return completeReplication(); |
| } |
| abortReplication('changes rejected', err); |
| } |
| |
| |
| function getChanges() { |
| if (!( |
| !changesPending && |
| !changesCompleted && |
| batches.length < batches_limit |
| )) { |
| return; |
| } |
| changesPending = true; |
| function abortChanges() { |
| changes.cancel(); |
| } |
| function removeListener() { |
| returnValue.removeListener('cancel', abortChanges); |
| } |
| |
| if (returnValue._changes) { // remove old changes() and listeners |
| returnValue.removeListener('cancel', returnValue._abortChanges); |
| returnValue._changes.cancel(); |
| } |
| returnValue.once('cancel', abortChanges); |
| |
| var changes = src.changes(changesOpts) |
| .on('change', onChange); |
| changes.then(removeListener, removeListener); |
| changes.then(onChangesComplete) |
| .catch(onChangesError); |
| |
| if (opts.retry) { |
| // save for later so we can cancel if necessary |
| returnValue._changes = changes; |
| returnValue._abortChanges = abortChanges; |
| } |
| } |
| |
| |
| function startChanges() { |
| initCheckpointer().then(function () { |
| if (returnValue.cancelled) { |
| completeReplication(); |
| return; |
| } |
| return checkpointer.getCheckpoint().then(function (checkpoint) { |
| last_seq = checkpoint; |
| changesOpts = { |
| since: last_seq, |
| limit: batch_size, |
| batch_size: batch_size, |
| style: 'all_docs', |
| doc_ids: doc_ids, |
| return_docs: true // required so we know when we're done |
| }; |
| if (opts.filter) { |
| if (typeof opts.filter !== 'string') { |
| // required for the client-side filter in onChange |
| changesOpts.include_docs = true; |
| } else { // ddoc filter |
| changesOpts.filter = opts.filter; |
| } |
| } |
| if ('heartbeat' in opts) { |
| changesOpts.heartbeat = opts.heartbeat; |
| } |
| if ('timeout' in opts) { |
| changesOpts.timeout = opts.timeout; |
| } |
| if (opts.query_params) { |
| changesOpts.query_params = opts.query_params; |
| } |
| if (opts.view) { |
| changesOpts.view = opts.view; |
| } |
| getChanges(); |
| }); |
| }).catch(function (err) { |
| abortReplication('getCheckpoint rejected with ', err); |
| }); |
| } |
| |
| /* istanbul ignore next */ |
| function onCheckpointError(err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| } |
| |
| /* istanbul ignore if */ |
| if (returnValue.cancelled) { // cancelled immediately |
| completeReplication(); |
| return; |
| } |
| |
| if (!returnValue._addedListeners) { |
| returnValue.once('cancel', completeReplication); |
| |
| if (typeof opts.complete === 'function') { |
| returnValue.once('error', opts.complete); |
| returnValue.once('complete', function (result) { |
| opts.complete(null, result); |
| }); |
| } |
| returnValue._addedListeners = true; |
| } |
| |
| if (typeof opts.since === 'undefined') { |
| startChanges(); |
| } else { |
| initCheckpointer().then(function () { |
| writingCheckpoint = true; |
| return checkpointer.writeCheckpoint(opts.since, session); |
| }).then(function () { |
| writingCheckpoint = false; |
| /* istanbul ignore if */ |
| if (returnValue.cancelled) { |
| completeReplication(); |
| return; |
| } |
| last_seq = opts.since; |
| startChanges(); |
| }).catch(onCheckpointError); |
| } |
| } |
| |
| export default replicate; |