| 'use strict'; |
| |
| var utils = require('./utils'); |
| var Pouch = require('./index'); |
| var EE = require('events').EventEmitter; |
| |
| var MAX_SIMULTANEOUS_REVS = 50; |
| |
| // We create a basic promise so the caller can cancel the replication possibly |
| // before we have actually started listening to changes etc |
| utils.inherits(Replication, EE); |
| function Replication(opts) { |
| EE.call(this); |
| this.cancelled = false; |
| var self = this; |
| var promise = new utils.Promise(function (fulfill, reject) { |
| self.once('complete', fulfill); |
| self.once('error', reject); |
| }); |
| self.then = function (resolve, reject) { |
| return promise.then(resolve, reject); |
| }; |
| self.catch = function (reject) { |
| return promise.catch(reject); |
| }; |
| // As we allow error handling via "error" event as well, |
| // put a stub in here so that rejecting never throws UnhandledError. |
| self.catch(function (err) {}); |
| } |
| |
| Replication.prototype.cancel = function () { |
| this.cancelled = true; |
| this.emit('cancel'); |
| }; |
| |
| Replication.prototype.ready = function (src, target) { |
| var self = this; |
| function onDestroy() { |
| self.cancel(); |
| } |
| src.once('destroyed', onDestroy); |
| target.once('destroyed', onDestroy); |
| function cleanup() { |
| src.removeListener('destroyed', onDestroy); |
| target.removeListener('destroyed', onDestroy); |
| } |
| this.then(cleanup, cleanup); |
| }; |
| |
| |
| // TODO: check CouchDB's replication id generation |
| // Generate a unique id particular to this replication |
| function genReplicationId(src, target, opts) { |
| var filterFun = opts.filter ? opts.filter.toString() : ''; |
| return src.id().then(function (src_id) { |
| return target.id().then(function (target_id) { |
| var queryData = src_id + target_id + filterFun + |
| JSON.stringify(opts.query_params) + opts.doc_ids; |
| return utils.MD5(queryData).then(function (md5) { |
| return '_local/' + md5; |
| }); |
| }); |
| }); |
| } |
| |
| |
| function updateCheckpoint(db, id, checkpoint, returnValue) { |
| return db.get(id).catch(function (err) { |
| if (err.status === 404) { |
| return {_id: id}; |
| } |
| throw err; |
| }).then(function (doc) { |
| if (returnValue.cancelled) { |
| return; |
| } |
| doc.last_seq = checkpoint; |
| return db.put(doc); |
| }); |
| } |
| |
| function Checkpointer(src, target, id, returnValue) { |
| this.src = src; |
| this.target = target; |
| this.id = id; |
| this.returnValue = returnValue; |
| } |
| |
| Checkpointer.prototype.writeCheckpoint = function (checkpoint) { |
| var self = this; |
| return this.updateTarget(checkpoint).then(function () { |
| return self.updateSource(checkpoint); |
| }); |
| }; |
| Checkpointer.prototype.updateTarget = function (checkpoint) { |
| return updateCheckpoint(this.target, this.id, checkpoint, this.returnValue); |
| }; |
| Checkpointer.prototype.updateSource = function (checkpoint) { |
| var self = this; |
| if (this.readOnlySource) { |
| return utils.Promise.resolve(true); |
| } |
| return updateCheckpoint(this.src, this.id, checkpoint, this.returnValue) |
| .catch(function (err) { |
| var isForbidden = typeof err.status === 'number' && |
| Math.floor(err.status / 100) === 4; |
| if (isForbidden) { |
| self.readOnlySource = true; |
| return true; |
| } |
| throw err; |
| }); |
| }; |
| Checkpointer.prototype.getCheckpoint = function () { |
| var self = this; |
| return self.target.get(self.id).then(function (targetDoc) { |
| return self.src.get(self.id).then(function (sourceDoc) { |
| if (targetDoc.last_seq === sourceDoc.last_seq) { |
| return sourceDoc.last_seq; |
| } |
| return 0; |
| }, function (err) { |
| if (err.status === 404 && targetDoc.last_seq) { |
| return self.src.put({ |
| _id: self.id, |
| last_seq: 0 |
| }).then(function () { |
| return 0; |
| }, function (err) { |
| if (err.status === 401) { |
| self.readOnlySource = true; |
| return targetDoc.last_seq; |
| } |
| return 0; |
| }); |
| } |
| throw err; |
| }); |
| }).catch(function (err) { |
| if (err.status !== 404) { |
| throw err; |
| } |
| return 0; |
| }); |
| }; |
| function replicate(repId, src, target, opts, returnValue) { |
| var batches = []; // list of batches to be processed |
| var currentBatch; // the batch currently being processed |
| var pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; // next batch, not yet ready to be processed |
| var writingCheckpoint = false; // true while checkpoint is being written |
| var changesCompleted = false; // true when all changes received |
| var replicationCompleted = false; // true when replication has completed |
| var last_seq = 0; |
| var continuous = opts.continuous || opts.live || false; |
| var batch_size = opts.batch_size || 100; |
| var batches_limit = opts.batches_limit || 10; |
| var changesPending = false; // true while src.changes is running |
| var changesCount = 0; // number of changes received since calling src.changes |
| var doc_ids = opts.doc_ids; |
| var checkpointer = new Checkpointer(src, target, repId, returnValue); |
| var result = { |
| ok: true, |
| start_time: new Date(), |
| docs_read: 0, |
| docs_written: 0, |
| doc_write_failures: 0, |
| errors: [] |
| }; |
| var changesOpts = {}; |
| returnValue.ready(src, target); |
| |
| |
| function writeDocs() { |
| if (currentBatch.docs.length === 0) { |
| return; |
| } |
| var docs = currentBatch.docs; |
| return target.bulkDocs({ |
| docs: docs |
| }, { |
| new_edits: false |
| }).then(function (res) { |
| if (returnValue.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| var errors = []; |
| res.forEach(function (res) { |
| if (!res.ok) { |
| result.doc_write_failures++; |
| errors.push(new Error(res.reason || res.message || 'Unknown reason')); |
| } |
| }); |
| if (errors.length > 0) { |
| var error = new Error('bulkDocs error'); |
| error.other_errors = errors; |
| abortReplication('target.bulkDocs failed to write docs', error); |
| throw new Error('bulkWrite partial failure'); |
| } |
| }, function (err) { |
| result.doc_write_failures += docs.length; |
| throw err; |
| }); |
| } |
| |
| |
| function getNextDoc() { |
| var diffs = currentBatch.diffs; |
| var id = Object.keys(diffs)[0]; |
| var allMissing = diffs[id].missing; |
| // avoid url too long error by batching |
| var missingBatches = []; |
| for (var i = 0; i < allMissing.length; i += MAX_SIMULTANEOUS_REVS) { |
| missingBatches.push(allMissing.slice(i, Math.min(allMissing.length, |
| i + MAX_SIMULTANEOUS_REVS))); |
| } |
| |
| return utils.Promise.all(missingBatches.map(function (missing) { |
| return src.get(id, {revs: true, open_revs: missing, attachments: true}) |
| .then(function (docs) { |
| docs.forEach(function (doc) { |
| if (returnValue.cancelled) { |
| return completeReplication(); |
| } |
| if (doc.ok) { |
| result.docs_read++; |
| currentBatch.pendingRevs++; |
| currentBatch.docs.push(doc.ok); |
| delete diffs[doc.ok._id]; |
| } |
| }); |
| }); |
| })); |
| } |
| |
| function getAllDocs() { |
| if (Object.keys(currentBatch.diffs).length > 0) { |
| return getNextDoc().then(getAllDocs); |
| } else { |
| return utils.Promise.resolve(); |
| } |
| } |
| |
| |
| function getRevisionOneDocs() { |
| // filter out the generation 1 docs and get them |
| // leaving the non-generation one docs to be got otherwise |
| var ids = Object.keys(currentBatch.diffs).filter(function (id) { |
| var missing = currentBatch.diffs[id].missing; |
| return missing.length === 1 && missing[0].slice(0, 2) === '1-'; |
| }); |
| return src.allDocs({ |
| keys: ids, |
| include_docs: true |
| }).then(function (res) { |
| if (returnValue.cancelled) { |
| completeReplication(); |
| throw (new Error('cancelled')); |
| } |
| res.rows.forEach(function (row) { |
| if (row.doc && !row.deleted && |
| row.value.rev.slice(0, 2) === '1-' && ( |
| !row.doc._attachments || |
| Object.keys(row.doc._attachments).length === 0 |
| ) |
| ) { |
| result.docs_read++; |
| currentBatch.pendingRevs++; |
| currentBatch.docs.push(row.doc); |
| delete currentBatch.diffs[row.id]; |
| } |
| }); |
| }); |
| } |
| |
| |
| function getDocs() { |
| return getRevisionOneDocs().then(getAllDocs); |
| } |
| |
| |
| function finishBatch() { |
| writingCheckpoint = true; |
| return checkpointer.writeCheckpoint( |
| currentBatch.seq |
| ).then(function (res) { |
| writingCheckpoint = false; |
| if (returnValue.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| result.last_seq = last_seq = currentBatch.seq; |
| result.docs_written += currentBatch.docs.length; |
| returnValue.emit('change', utils.clone(result)); |
| currentBatch = undefined; |
| getChanges(); |
| }).catch(function (err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| }); |
| } |
| |
| |
| function getDiffs() { |
| var diff = {}; |
| currentBatch.changes.forEach(function (change) { |
| diff[change.id] = change.changes.map(function (x) { |
| return x.rev; |
| }); |
| }); |
| return target.revsDiff(diff).then(function (diffs) { |
| if (returnValue.cancelled) { |
| completeReplication(); |
| throw new Error('cancelled'); |
| } |
| // currentBatch.diffs elements are deleted as the documents are written |
| currentBatch.diffs = diffs; |
| currentBatch.pendingRevs = 0; |
| }); |
| } |
| |
| |
| function startNextBatch() { |
| if (returnValue.cancelled || currentBatch) { |
| return; |
| } |
| if (batches.length === 0) { |
| processPendingBatch(true); |
| return; |
| } |
| currentBatch = batches.shift(); |
| getDiffs() |
| .then(getDocs) |
| .then(writeDocs) |
| .then(finishBatch) |
| .then(startNextBatch) |
| .catch(function (err) { |
| abortReplication('batch processing terminated with error', err); |
| }); |
| } |
| |
| |
| function processPendingBatch(immediate) { |
| if (pendingBatch.changes.length === 0) { |
| if (batches.length === 0 && !currentBatch) { |
| if ((continuous && changesOpts.live) || changesCompleted) { |
| returnValue.emit('uptodate', utils.clone(result)); |
| } |
| if (changesCompleted) { |
| completeReplication(); |
| } |
| } |
| return; |
| } |
| if ( |
| immediate || |
| changesCompleted || |
| pendingBatch.changes.length >= batch_size |
| ) { |
| batches.push(pendingBatch); |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| startNextBatch(); |
| } |
| } |
| |
| |
| function abortReplication(reason, err) { |
| if (replicationCompleted) { |
| return; |
| } |
| result.ok = false; |
| result.status = 'aborted'; |
| result.errors.push(err); |
| batches = []; |
| pendingBatch = { |
| seq: 0, |
| changes: [], |
| docs: [] |
| }; |
| completeReplication(); |
| } |
| |
| |
| function completeReplication() { |
| if (replicationCompleted) { |
| return; |
| } |
| if (returnValue.cancelled) { |
| result.status = 'cancelled'; |
| if (writingCheckpoint) { |
| return; |
| } |
| } |
| result.status = result.status || 'complete'; |
| result.end_time = new Date(); |
| result.last_seq = last_seq; |
| replicationCompleted = returnValue.cancelled = true; |
| if (result.errors.length > 0) { |
| var error = result.errors.pop(); |
| if (result.errors.length > 0) { |
| error.other_errors = result.errors; |
| } |
| error.result = result; |
| returnValue.emit('error', error); |
| } else { |
| returnValue.emit('complete', result); |
| } |
| returnValue.removeAllListeners(); |
| } |
| |
| |
| function onChange(change) { |
| if (returnValue.cancelled) { |
| return completeReplication(); |
| } |
| changesCount++; |
| if ( |
| pendingBatch.changes.length === 0 && |
| batches.length === 0 && |
| !currentBatch |
| ) { |
| returnValue.emit('outofdate', utils.clone(result)); |
| } |
| pendingBatch.seq = change.seq; |
| pendingBatch.changes.push(change); |
| processPendingBatch(batches.length === 0); |
| } |
| |
| |
| function onChangesComplete(changes) { |
| changesPending = false; |
| if (returnValue.cancelled) { |
| return completeReplication(); |
| } |
| if (changesCount > 0) { |
| changesOpts.since = changes.last_seq; |
| getChanges(); |
| } else { |
| if (continuous) { |
| changesOpts.live = true; |
| getChanges(); |
| } else { |
| changesCompleted = true; |
| } |
| } |
| processPendingBatch(true); |
| } |
| |
| |
| function onChangesError(err) { |
| changesPending = false; |
| if (returnValue.cancelled) { |
| return completeReplication(); |
| } |
| abortReplication('changes rejected', err); |
| } |
| |
| |
| function getChanges() { |
| if (!( |
| !changesPending && |
| !changesCompleted && |
| batches.length < batches_limit |
| )) { |
| return; |
| } |
| changesPending = true; |
| changesCount = 0; |
| function abortChanges() { |
| changes.cancel(); |
| } |
| function removeListener() { |
| returnValue.removeListener('cancel', abortChanges); |
| } |
| returnValue.once('cancel', abortChanges); |
| var changes = src.changes(changesOpts) |
| .on('change', onChange); |
| changes.then(removeListener, removeListener); |
| changes.then(onChangesComplete) |
| .catch(onChangesError); |
| } |
| |
| |
| function startChanges() { |
| checkpointer.getCheckpoint().then(function (checkpoint) { |
| last_seq = checkpoint; |
| changesOpts = { |
| since: last_seq, |
| limit: batch_size, |
| batch_size: batch_size, |
| style: 'all_docs', |
| doc_ids: doc_ids, |
| returnDocs: false |
| }; |
| if (opts.filter) { |
| changesOpts.filter = opts.filter; |
| } |
| if (opts.query_params) { |
| changesOpts.query_params = opts.query_params; |
| } |
| getChanges(); |
| }).catch(function (err) { |
| abortReplication('getCheckpoint rejected with ', err); |
| }); |
| } |
| |
| |
| returnValue.once('cancel', completeReplication); |
| |
| if (typeof opts.onChange === 'function') { |
| returnValue.on('change', opts.onChange); |
| } |
| |
| if (typeof opts.complete === 'function') { |
| returnValue.once('error', opts.complete); |
| returnValue.once('complete', function (result) { |
| opts.complete(null, result); |
| }); |
| } |
| |
| if (typeof opts.since === 'undefined') { |
| startChanges(); |
| } else { |
| writingCheckpoint = true; |
| checkpointer.writeCheckpoint(opts.since).then(function (res) { |
| writingCheckpoint = false; |
| if (returnValue.cancelled) { |
| completeReplication(); |
| return; |
| } |
| last_seq = opts.since; |
| startChanges(); |
| }).catch(function (err) { |
| writingCheckpoint = false; |
| abortReplication('writeCheckpoint completed with error', err); |
| throw err; |
| }); |
| } |
| } |
| |
| |
| function toPouch(db, PouchConstructor) { |
| if (typeof db === 'string') { |
| return new PouchConstructor(db); |
| } else if (db.then) { |
| return db; |
| } else { |
| return utils.Promise.resolve(db); |
| } |
| } |
| |
| |
| function replicateWrapper(src, target, opts, callback) { |
| if (typeof opts === 'function') { |
| callback = opts; |
| opts = {}; |
| } |
| if (typeof opts === 'undefined') { |
| opts = {}; |
| } |
| if (!opts.complete) { |
| opts.complete = callback || function () {}; |
| } |
| opts = utils.clone(opts); |
| opts.continuous = opts.continuous || opts.live; |
| var replicateRet = new Replication(opts); |
| var PouchConstructor = opts.pouchConstructor || Pouch; |
| toPouch(src, PouchConstructor).then(function (src) { |
| return toPouch(target, PouchConstructor).then(function (target) { |
| return genReplicationId(src, target, opts).then(function (repId) { |
| replicate(repId, src, target, opts, replicateRet); |
| }); |
| }); |
| }).catch(function (err) { |
| replicateRet.emit('error', err); |
| opts.complete(err); |
| }); |
| return replicateRet; |
| } |
| |
| exports.replicate = replicateWrapper; |