| 'use strict'; |
| |
| var utils = require('./../utils'); |
| var Promise = utils.Promise; |
| var Checkpointer = require('./checkpointer'); |
| var backOff = require('./backoff'); |
| var genReplicationId = require('./gen-replication-id'); |
| |
| var MAX_SIMULTANEOUS_REVS = 50; |
| |
| function isGenOne(rev) { |
| return /^1-/.test(rev); |
| } |
| |
| function replicate(src, target, opts, returnValue, result) { |
| var batches = []; // list of batches to be processed |
| var currentBatch; // the batch currently being processed |
| var pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; // next batch, not yet ready to be processed |
| var writingCheckpoint = false; // true while checkpoint is being written |
| var changesCompleted = false; // true when all changes received |
| var replicationCompleted = false; // true when replication has completed |
| var last_seq = 0; |
| var continuous = opts.continuous || opts.live || false; |
| var batch_size = opts.batch_size || 100; |
| var batches_limit = opts.batches_limit || 10; |
| var changesPending = false; // true while src.changes is running |
| var doc_ids = opts.doc_ids; |
| var state = { |
| cancelled: false |
| }; |
| var repId; |
| var checkpointer; |
| var allErrors = []; |
| var changedDocs = []; |
| |
| result = result || { |
| ok: true, |
| start_time: new Date(), |
| docs_read: 0, |
| docs_written: 0, |
| doc_write_failures: 0, |
| errors: [] |
| }; |
| |
| var changesOpts = {}; |
| returnValue.ready(src, target); |
| |
| function initCheckpointer() { |
| if (checkpointer) { |
| return utils.Promise.resolve(); |
| } |
| return genReplicationId(src, target, opts).then(function (res) { |
| repId = res; |
| checkpointer = new Checkpointer(src, target, repId, state); |
| }); |
| } |
| |
| function writeDocs() { |
| if (currentBatch.docs.length === 0) { |
| return; |
| } |
| var docs = currentBatch.docs; |
| return target.bulkDocs({docs: docs, new_edits: false}).then(function (res) { |
| if (state.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| var errors = []; |
| var errorsById = {}; |
| res.forEach(function (res) { |
| if (res.error) { |
| result.doc_write_failures++; |
| errors.push(res); |
| errorsById[res.id] = res; |
| } |
| }); |
| allErrors = allErrors.concat(errors); |
| result.docs_written += currentBatch.docs.length - errors.length; |
| var non403s = errors.filter(function (error) { |
| return error.name !== 'unauthorized' && error.name !== 'forbidden'; |
| }); |
| |
| changedDocs = []; |
| docs.forEach(function(doc) { |
| var error = errorsById[doc._id]; |
| if (error) { |
| returnValue.emit('denied', utils.clone(error)); |
| } else { |
| changedDocs.push(doc); |
| } |
| }); |
| |
| if (non403s.length > 0) { |
| var error = new Error('bulkDocs error'); |
| error.other_errors = errors; |
| abortReplication('target.bulkDocs failed to write docs', error); |
| throw new Error('bulkWrite partial failure'); |
| } |
| }, function (err) { |
| result.doc_write_failures += docs.length; |
| throw err; |
| }); |
| } |
| |
| function processDiffDoc(id) { |
| var diffs = currentBatch.diffs; |
| var allMissing = diffs[id].missing; |
| // avoid url too long error by batching |
| var missingBatches = []; |
| for (var i = 0; i < allMissing.length; i += MAX_SIMULTANEOUS_REVS) { |
| missingBatches.push(allMissing.slice(i, Math.min(allMissing.length, |
| i + MAX_SIMULTANEOUS_REVS))); |
| } |
| |
| return Promise.all(missingBatches.map(function (missing) { |
| var opts = { |
| revs: true, |
| open_revs: missing, |
| attachments: true |
| }; |
| return src.get(id, opts).then(function (docs) { |
| docs.forEach(function (doc) { |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| if (doc.ok) { |
| result.docs_read++; |
| currentBatch.pendingRevs++; |
| currentBatch.docs.push(doc.ok); |
| } |
| }); |
| delete diffs[id]; |
| }); |
| })); |
| } |
| |
| function getAllDocs() { |
| var diffKeys = Object.keys(currentBatch.diffs); |
| return Promise.all(diffKeys.map(processDiffDoc)); |
| } |
| |
| |
| function getRevisionOneDocs() { |
| // filter out the generation 1 docs and get them |
| // leaving the non-generation one docs to be got otherwise |
| var ids = Object.keys(currentBatch.diffs).filter(function (id) { |
| var missing = currentBatch.diffs[id].missing; |
| return missing.length === 1 && isGenOne(missing[0]); |
| }); |
| if (!ids.length) { // nothing to fetch |
| return Promise.resolve(); |
| } |
| return src.allDocs({ |
| keys: ids, |
| include_docs: true, |
| attachments: true |
| }).then(function (res) { |
| if (state.cancelled) { |
| completeReplication(); |
| throw (new Error('cancelled')); |
| } |
| res.rows.forEach(function (row) { |
| // Optimization: fetch attachments along with _all_docs. |
| // These will be stubs in CouchDB < 1.6 because attachments |
| // weren't supported in _all_docs. |
| var hasAttachments = row.doc && row.doc._attachments && |
| Object.keys(row.doc._attachments).length > 0; |
| var hasAttachmentData = hasAttachments && |
| !row.doc._attachments[Object.keys(row.doc._attachments)[0]].stub; |
| |
| if (row.doc && !row.deleted && |
| isGenOne(row.value.rev) && |
| (!hasAttachments || hasAttachmentData) |
| ) { |
| result.docs_read++; |
| currentBatch.pendingRevs++; |
| currentBatch.docs.push(row.doc); |
| delete currentBatch.diffs[row.id]; |
| } |
| }); |
| }); |
| } |
| |
| function getDocs() { |
| return getRevisionOneDocs().then(getAllDocs); |
| } |
| |
| function finishBatch() { |
| writingCheckpoint = true; |
| return checkpointer.writeCheckpoint(currentBatch.seq).then(function () { |
| writingCheckpoint = false; |
| if (state.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| result.last_seq = last_seq = currentBatch.seq; |
| var outResult = utils.clone(result); |
| outResult.docs = changedDocs; |
| returnValue.emit('change', outResult); |
| currentBatch = undefined; |
| getChanges(); |
| }).catch(function (err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| }); |
| } |
| |
| function getDiffs() { |
| var diff = {}; |
| currentBatch.changes.forEach(function (change) { |
| // Couchbase Sync Gateway emits these, but we can ignore them |
| if (change.id === "_user/") { |
| return; |
| } |
| diff[change.id] = change.changes.map(function (x) { |
| return x.rev; |
| }); |
| }); |
| return target.revsDiff(diff).then(function (diffs) { |
| if (state.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| // currentBatch.diffs elements are deleted as the documents are written |
| currentBatch.diffs = diffs; |
| currentBatch.pendingRevs = 0; |
| }); |
| } |
| |
| function startNextBatch() { |
| if (state.cancelled || currentBatch) { |
| return; |
| } |
| if (batches.length === 0) { |
| processPendingBatch(true); |
| return; |
| } |
| currentBatch = batches.shift(); |
| getDiffs() |
| .then(getDocs) |
| .then(writeDocs) |
| .then(finishBatch) |
| .then(startNextBatch) |
| .catch(function (err) { |
| abortReplication('batch processing terminated with error', err); |
| }); |
| } |
| |
| |
| function processPendingBatch(immediate) { |
| if (pendingBatch.changes.length === 0) { |
| if (batches.length === 0 && !currentBatch) { |
| if ((continuous && changesOpts.live) || changesCompleted) { |
| returnValue.state = 'pending'; |
| returnValue.emit('paused'); |
| } |
| if (changesCompleted) { |
| completeReplication(); |
| } |
| } |
| return; |
| } |
| if ( |
| immediate || |
| changesCompleted || |
| pendingBatch.changes.length >= batch_size |
| ) { |
| batches.push(pendingBatch); |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| if (returnValue.state === 'pending' || returnValue.state === 'stopped') { |
| returnValue.state = 'active'; |
| returnValue.emit('active'); |
| } |
| startNextBatch(); |
| } |
| } |
| |
| |
| function abortReplication(reason, err) { |
| if (replicationCompleted) { |
| return; |
| } |
| if (!err.message) { |
| err.message = reason; |
| } |
| result.ok = false; |
| result.status = 'aborting'; |
| result.errors.push(err); |
| allErrors = allErrors.concat(err); |
| batches = []; |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| completeReplication(); |
| } |
| |
| |
| function completeReplication() { |
| if (replicationCompleted) { |
| return; |
| } |
| if (state.cancelled) { |
| result.status = 'cancelled'; |
| if (writingCheckpoint) { |
| return; |
| } |
| } |
| result.status = result.status || 'complete'; |
| result.end_time = new Date(); |
| result.last_seq = last_seq; |
| replicationCompleted = state.cancelled = true; |
| var non403s = allErrors.filter(function (error) { |
| return error.name !== 'unauthorized' && error.name !== 'forbidden'; |
| }); |
| if (non403s.length > 0) { |
| var error = allErrors.pop(); |
| if (allErrors.length > 0) { |
| error.other_errors = allErrors; |
| } |
| error.result = result; |
| backOff(opts, returnValue, error, function () { |
| replicate(src, target, opts, returnValue); |
| }); |
| } else { |
| result.errors = allErrors; |
| returnValue.emit('complete', result); |
| returnValue.removeAllListeners(); |
| } |
| } |
| |
| |
| function onChange(change) { |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| var filter = utils.filterChange(opts)(change); |
| if (!filter) { |
| return; |
| } |
| pendingBatch.seq = change.seq; |
| pendingBatch.changes.push(change); |
| processPendingBatch(batches.length === 0); |
| } |
| |
| |
| function onChangesComplete(changes) { |
| changesPending = false; |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| |
| // if no results were returned then we're done, |
| // else fetch more |
| if (changes.results.length > 0) { |
| changesOpts.since = changes.last_seq; |
| getChanges(); |
| } else { |
| if (continuous) { |
| changesOpts.live = true; |
| getChanges(); |
| } else { |
| changesCompleted = true; |
| } |
| } |
| processPendingBatch(true); |
| } |
| |
| |
| function onChangesError(err) { |
| changesPending = false; |
| if (state.cancelled) { |
| return completeReplication(); |
| } |
| abortReplication('changes rejected', err); |
| } |
| |
| |
| function getChanges() { |
| if (!( |
| !changesPending && |
| !changesCompleted && |
| batches.length < batches_limit |
| )) { |
| return; |
| } |
| changesPending = true; |
| function abortChanges() { |
| changes.cancel(); |
| } |
| function removeListener() { |
| returnValue.removeListener('cancel', abortChanges); |
| } |
| |
| if (returnValue._changes) { // remove old changes() and listeners |
| returnValue.removeListener('cancel', returnValue._abortChanges); |
| returnValue._changes.cancel(); |
| } |
| returnValue.once('cancel', abortChanges); |
| |
| var changes = src.changes(changesOpts) |
| .on('change', onChange); |
| changes.then(removeListener, removeListener); |
| changes.then(onChangesComplete) |
| .catch(onChangesError); |
| |
| if (opts.retry) { |
| // save for later so we can cancel if necessary |
| returnValue._changes = changes; |
| returnValue._abortChanges = abortChanges; |
| } |
| } |
| |
| |
| function startChanges() { |
| initCheckpointer().then(function () { |
| return checkpointer.getCheckpoint(); |
| }).then(function (checkpoint) { |
| last_seq = checkpoint; |
| changesOpts = { |
| since: last_seq, |
| limit: batch_size, |
| batch_size: batch_size, |
| style: 'all_docs', |
| doc_ids: doc_ids, |
| returnDocs: true // required so we know when we're done |
| }; |
| if (opts.filter) { |
| if (typeof opts.filter !== 'string') { |
| // required for the client-side filter in onChange |
| changesOpts.include_docs = true; |
| } else { // ddoc filter |
| changesOpts.filter = opts.filter; |
| } |
| } |
| if (opts.query_params) { |
| changesOpts.query_params = opts.query_params; |
| } |
| if (opts.view) { |
| changesOpts.view = opts.view; |
| } |
| getChanges(); |
| }).catch(function (err) { |
| abortReplication('getCheckpoint rejected with ', err); |
| }); |
| } |
| |
| if (returnValue.cancelled) { // cancelled immediately |
| completeReplication(); |
| return; |
| } |
| |
| if (!returnValue._addedListeners) { |
| returnValue.once('cancel', completeReplication); |
| |
| if (typeof opts.onChange === 'function') { |
| returnValue.on('change', opts.onChange); |
| } |
| |
| if (typeof opts.complete === 'function') { |
| returnValue.once('error', opts.complete); |
| returnValue.once('complete', function (result) { |
| opts.complete(null, result); |
| }); |
| } |
| returnValue._addedListeners = true; |
| } |
| |
| if (typeof opts.since === 'undefined') { |
| startChanges(); |
| } else { |
| initCheckpointer().then(function () { |
| writingCheckpoint = true; |
| return checkpointer.writeCheckpoint(opts.since); |
| }).then(function () { |
| writingCheckpoint = false; |
| if (state.cancelled) { |
| completeReplication(); |
| return; |
| } |
| last_seq = opts.since; |
| startChanges(); |
| }).catch(function (err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| }); |
| } |
| } |
| |
| module.exports = replicate; |