(#3062) - start fixing
diff --git a/lib/adapters/idb.js b/lib/adapters/idb.js index cc5544e..a38b427 100644 --- a/lib/adapters/idb.js +++ b/lib/adapters/idb.js
@@ -432,7 +432,8 @@ }); } - function writeDoc(docInfo, winningRev, deleted, callback, _up, resultsIdx) { + function writeDoc(docInfo, delta, winningRev, deleted, callback, + isUpdate, resultsIdx) { var err = null; var recv = 0; var id = docInfo.data._id = docInfo.metadata.id;
diff --git a/lib/adapters/leveldb.js b/lib/adapters/leveldb.js index 73d1564..b9acb13 100644 --- a/lib/adapters/leveldb.js +++ b/lib/adapters/leveldb.js
@@ -18,7 +18,6 @@ var BINARY_STORE = 'attach-binary-store'; var LOCAL_STORE = 'local-store'; var META_STORE = 'meta-store'; -var BATCH_SIZE = 50; // leveldb barks if we try to open a db multiple times // so we cache opened connections here for initstore() @@ -343,11 +342,11 @@ api._bulkDocs = writeLock(function (req, opts, callback) { var newEdits = opts.new_edits; var results = new Array(req.docs.length); - var lock = new utils.Set(); + var fetchedDocs = new utils.Map(); // parse the docs and give each a sequence number var userDocs = req.docs; - var info = userDocs.map(function (doc, i) { + var docInfos = userDocs.map(function (doc, i) { if (doc._id && utils.isLocalId(doc._id)) { return doc; } @@ -359,8 +358,7 @@ return newDoc; }); - var current = 0; - var infoErrors = info.filter(function (doc) { + var infoErrors = docInfos.filter(function (doc) { return doc.error; }); @@ -415,149 +413,47 @@ }); } - var inProgress = 0; - function processDocs() { - var index = current; - if (inProgress > BATCH_SIZE) { - return; + function fetchExistingDocs(callback) { + if (!docInfos.length) { + return callback(); } - if (index >= info.length) { - if (inProgress === 0) { - return complete(); - } else { - return; + + var numFetched = 0; + + function checkDone() { + if (++numFetched === docInfos.length) { + callback(); } } - var currentDoc = info[index]; - current++; - inProgress++; - if (currentDoc._id && utils.isLocalId(currentDoc._id)) { - api[currentDoc._deleted ? '_removeLocalNoLock' : '_putLocalNoLock']( - currentDoc, function (err, resp) { - if (err) { - results[index] = err; - } else { - results[index] = {}; + + docInfos.forEach(function (docInfo) { + if (docInfo._id && utils.isLocalId(docInfo._id)) { + return checkDone(); // skip local docs + } + var id = docInfo.metadata.id; + stores.docStore.get(id, function (err, oldDoc) { + if (oldDoc) { + fetchedDocs.set(id, oldDoc); } - inProgress--; - processDocs(); + checkDone(); }); - return; - } - - if (lock.has(currentDoc.metadata.id)) { - results[index] = errors.conflict(currentDoc.metadata.id); - inProgress--; - return processDocs(); - } - lock.add(currentDoc.metadata.id); - - stores.docStore.get(currentDoc.metadata.id, function (err, oldDoc) { - if (err) { - if (err.name === 'NotFoundError') { - insertDoc(currentDoc, index, function () { - lock.delete(currentDoc.metadata.id); - inProgress--; - processDocs(); - }); - } else { - err.error = true; - results[index] = err; - lock.delete(currentDoc.metadata.id); - inProgress--; - processDocs(); - } - } else { - updateDoc(oldDoc, currentDoc, index, function () { - lock.delete(currentDoc.metadata.id); - inProgress--; - processDocs(); - }); - } - }); - - if (newEdits) { - processDocs(); - } - } - - function insertDoc(doc, index, callback) { - // Can't insert new deleted documents - if ('was_delete' in opts && utils.isDeleted(doc.metadata)) { - results[index] = errors.missing(); - return callback(); - } - writeDoc(doc, index, function (err) { - if (err) { - return callback(err); - } - if (utils.isDeleted(doc.metadata)) { - return callback(); - } - incrementDocCount(1, callback); }); } - function updateDoc(oldDoc, docInfo, index, callback) { - - if (utils.revExists(oldDoc, docInfo.metadata.rev)) { - results[index] = docInfo; - callback(); - return; - } - - var previouslyDeleted = utils.isDeleted(oldDoc); - var deleted = utils.isDeleted(docInfo.metadata); - var isRoot = /^1-/.test(docInfo.metadata.rev); - - if (previouslyDeleted && !deleted && newEdits && isRoot) { - var newDoc = docInfo.data; - newDoc._rev = oldDoc.rev; - newDoc._id = docInfo.metadata.id; - docInfo = utils.parseDoc(newDoc, newEdits); - } - - var merged = - merge.merge(oldDoc.rev_tree, docInfo.metadata.rev_tree[0], 1000); - - var inConflict = newEdits && ((previouslyDeleted && deleted) || - (!previouslyDeleted && newEdits && merged.conflicts !== 'new_leaf') || - (previouslyDeleted && !deleted && merged.conflicts === 'new_branch')); - - if (inConflict) { - results[index] = errors.conflict(docInfo.metadata.id); - return callback(); - } - var newRev = docInfo.metadata.rev; - docInfo.metadata.rev_tree = merged.tree; - docInfo.metadata.rev_map = oldDoc.rev_map; - - var delta = 0; - if (newEdits || merge.winningRev(docInfo.metadata) === newRev) { - // if newEdits==false and we're pushing existing revisions, - // then the only thing that matters is whether this revision - // is the winning one, and thus replaces an old one - delta = (previouslyDeleted === deleted) ? 0 : - previouslyDeleted < deleted ? -1 : 1; - } - - incrementDocCount(delta, function (err) { - if (err) { - return callback(err); - } - writeDoc(docInfo, index, callback); - }); - + function processDocs() { + utils.processDocs(docInfos, api, fetchedDocs, + null, results, writeDoc, opts, complete); } - function writeDoc(doc, index, callback2) { + function writeDoc(doc, delta, winningRev, deleted, callback2, isUpdate, + resultsIdx) { var err = null; var recv = 0; doc.data._id = doc.metadata.id; doc.data._rev = doc.metadata.rev; - if (utils.isDeleted(doc.metadata)) { + if (deleted) { doc.data._deleted = true; } @@ -658,11 +554,11 @@ return stores.metaStore.put(UPDATE_SEQ_KEY, db._updateSeq, function (err) { if (err) { - results[index] = err; + results[resultsIdx] = err; } else { - results[index] = doc; + results[resultsIdx] = doc; } - return callback2(); + incrementDocCount(delta, callback2); }); }); } @@ -735,7 +631,10 @@ }); } - function complete() { + function complete(err) { + if (err) { + return callback(err); + } var aresults = results.map(function (result) { if (!Object.keys(result).length) { return { @@ -760,12 +659,11 @@ callback(null, aresults); }); } - verifyAttachments(function (err) { if (err) { return callback(err); } - processDocs(); + fetchExistingDocs(processDocs); }); }); api._allDocs = function (opts, callback) { @@ -1197,12 +1095,20 @@ }); }; - api._putLocal = writeLock(function (doc, callback) { + api._putLocal = writeLock(function (doc, opts, callback) { + if (typeof opts === 'function') { + callback = opts; + opts = {}; + } api._putLocalNoLock(doc, callback); }); // the NoLock version is for use by bulkDocs - api._putLocalNoLock = function (doc, callback) { + api._putLocalNoLock = function (doc, opts, callback) { + if (typeof opts === 'function') { + callback = opts; + opts = {}; + } delete doc._revisions; // ignore this, trust the rev var oldRev = doc._rev; var id = doc._id;
diff --git a/lib/adapters/websql.js b/lib/adapters/websql.js index 4756357..9f7d48a 100644 --- a/lib/adapters/websql.js +++ b/lib/adapters/websql.js
@@ -659,8 +659,8 @@ } - function writeDoc(docInfo, winningRev, deleted, callback, isUpdate, - resultsIdx) { + function writeDoc(docInfo, delta, winningRev, deleted, callback, + isUpdate, resultsIdx) { function finish() { var data = docInfo.data;
diff --git a/lib/utils.js b/lib/utils.js index 6ada364..d565665 100644 --- a/lib/utils.js +++ b/lib/utils.js
@@ -632,11 +632,11 @@ }; exports.updateDoc = function updateDoc(prev, docInfo, results, - i, cb, write, newEdits) { + i, callback, write, newEdits) { if (exports.revExists(prev, docInfo.metadata.rev)) { results[i] = docInfo; - return cb(); + return callback(); } var previouslyDeleted = exports.isDeleted(prev); @@ -659,16 +659,27 @@ if (inConflict) { var err = errors.conflict(docInfo.metadata.id); results[i] = err; - return cb(); + return callback(); } + var newRev = docInfo.metadata.rev; docInfo.metadata.rev_tree = merged.tree; + docInfo.metadata.rev_map = prev.rev_map; // used by leveldb // recalculate var winningRev = merge.winningRev(docInfo.metadata); deleted = exports.isDeleted(docInfo.metadata, winningRev); - write(docInfo, winningRev, deleted, cb, true, i); + var delta = 0; + if (newEdits || winningRev === newRev) { + // if newEdits==false and we're pushing existing revisions, + // then the only thing that matters is whether this revision + // is the winning one, and thus replaces an old one + delta = (previouslyDeleted === deleted) ? 0 : + previouslyDeleted < deleted ? -1 : 1; + } + + write(docInfo, delta, winningRev, deleted, callback, true, i); }; // massage an array response, e.g. the response we get from couch @@ -696,21 +707,23 @@ }; exports.processDocs = function processDocs(docInfos, api, fetchedDocs, - tx, results, writeDoc, opts) { + tx, results, writeDoc, opts, + callback) { - if (!docInfos.length) { - return; + if (!callback) { + callback = function () {}; } - function insertDoc(docInfo, resultsIdx, callback) { - // Cant insert new deleted documents - var winningRev = merge.winningRev(docInfo.metadata); - var deleted = exports.isDeleted(docInfo.metadata, winningRev); - if ('was_delete' in opts && deleted) { - results[resultsIdx] = errors.MISSING_DOC; + if (!docInfos.length) { + return callback(); + } + + var done = 0; + + function checkDone() { + if (++done === docInfos.length) { return callback(); } - writeDoc(docInfo, winningRev, deleted, callback, false, resultsIdx); } var newEdits = opts.new_edits; @@ -727,7 +740,7 @@ results[resultsIdx] = {}; } }); - return; + return checkDone(); } var id = currentDoc.metadata.id; @@ -738,6 +751,18 @@ } }); + function insertDoc(docInfo, resultsIdx, callback) { + // Cant insert new deleted documents + var winningRev = merge.winningRev(docInfo.metadata); + var deleted = exports.isDeleted(docInfo.metadata, winningRev); + if ('was_delete' in opts && deleted) { + results[resultsIdx] = errors.MISSING_DOC; + return callback(); + } + var delta = deleted ? 0 : 1; + writeDoc(docInfo, delta, winningRev, deleted, callback, false, resultsIdx); + } + // in the case of new_edits, the user can provide multiple docs // with the same id. these need to be processed sequentially idsToDocs.forEach(function (docs, id) { @@ -746,6 +771,8 @@ function docWritten() { if (++numDone < docs.length) { nextDoc(); + } else { + checkDone(); } } function nextDoc() {