(#3062) - start fixing
diff --git a/lib/adapters/idb.js b/lib/adapters/idb.js
index cc5544e..a38b427 100644
--- a/lib/adapters/idb.js
+++ b/lib/adapters/idb.js
@@ -432,7 +432,8 @@
});
}
- function writeDoc(docInfo, winningRev, deleted, callback, _up, resultsIdx) {
+ function writeDoc(docInfo, delta, winningRev, deleted, callback,
+ isUpdate, resultsIdx) {
var err = null;
var recv = 0;
var id = docInfo.data._id = docInfo.metadata.id;
diff --git a/lib/adapters/leveldb.js b/lib/adapters/leveldb.js
index 73d1564..b9acb13 100644
--- a/lib/adapters/leveldb.js
+++ b/lib/adapters/leveldb.js
@@ -18,7 +18,6 @@
var BINARY_STORE = 'attach-binary-store';
var LOCAL_STORE = 'local-store';
var META_STORE = 'meta-store';
-var BATCH_SIZE = 50;
// leveldb barks if we try to open a db multiple times
// so we cache opened connections here for initstore()
@@ -343,11 +342,11 @@
api._bulkDocs = writeLock(function (req, opts, callback) {
var newEdits = opts.new_edits;
var results = new Array(req.docs.length);
- var lock = new utils.Set();
+ var fetchedDocs = new utils.Map();
// parse the docs and give each a sequence number
var userDocs = req.docs;
- var info = userDocs.map(function (doc, i) {
+ var docInfos = userDocs.map(function (doc, i) {
if (doc._id && utils.isLocalId(doc._id)) {
return doc;
}
@@ -359,8 +358,7 @@
return newDoc;
});
- var current = 0;
- var infoErrors = info.filter(function (doc) {
+ var infoErrors = docInfos.filter(function (doc) {
return doc.error;
});
@@ -415,149 +413,47 @@
});
}
- var inProgress = 0;
- function processDocs() {
- var index = current;
- if (inProgress > BATCH_SIZE) {
- return;
+ function fetchExistingDocs(callback) {
+ if (!docInfos.length) {
+ return callback();
}
- if (index >= info.length) {
- if (inProgress === 0) {
- return complete();
- } else {
- return;
+
+ var numFetched = 0;
+
+ function checkDone() {
+ if (++numFetched === docInfos.length) {
+ callback();
}
}
- var currentDoc = info[index];
- current++;
- inProgress++;
- if (currentDoc._id && utils.isLocalId(currentDoc._id)) {
- api[currentDoc._deleted ? '_removeLocalNoLock' : '_putLocalNoLock'](
- currentDoc, function (err, resp) {
- if (err) {
- results[index] = err;
- } else {
- results[index] = {};
+
+ docInfos.forEach(function (docInfo) {
+ if (docInfo._id && utils.isLocalId(docInfo._id)) {
+ return checkDone(); // skip local docs
+ }
+ var id = docInfo.metadata.id;
+ stores.docStore.get(id, function (err, oldDoc) {
+ if (oldDoc) {
+ fetchedDocs.set(id, oldDoc);
}
- inProgress--;
- processDocs();
+ checkDone();
});
- return;
- }
-
- if (lock.has(currentDoc.metadata.id)) {
- results[index] = errors.conflict(currentDoc.metadata.id);
- inProgress--;
- return processDocs();
- }
- lock.add(currentDoc.metadata.id);
-
- stores.docStore.get(currentDoc.metadata.id, function (err, oldDoc) {
- if (err) {
- if (err.name === 'NotFoundError') {
- insertDoc(currentDoc, index, function () {
- lock.delete(currentDoc.metadata.id);
- inProgress--;
- processDocs();
- });
- } else {
- err.error = true;
- results[index] = err;
- lock.delete(currentDoc.metadata.id);
- inProgress--;
- processDocs();
- }
- } else {
- updateDoc(oldDoc, currentDoc, index, function () {
- lock.delete(currentDoc.metadata.id);
- inProgress--;
- processDocs();
- });
- }
- });
-
- if (newEdits) {
- processDocs();
- }
- }
-
- function insertDoc(doc, index, callback) {
- // Can't insert new deleted documents
- if ('was_delete' in opts && utils.isDeleted(doc.metadata)) {
- results[index] = errors.missing();
- return callback();
- }
- writeDoc(doc, index, function (err) {
- if (err) {
- return callback(err);
- }
- if (utils.isDeleted(doc.metadata)) {
- return callback();
- }
- incrementDocCount(1, callback);
});
}
- function updateDoc(oldDoc, docInfo, index, callback) {
-
- if (utils.revExists(oldDoc, docInfo.metadata.rev)) {
- results[index] = docInfo;
- callback();
- return;
- }
-
- var previouslyDeleted = utils.isDeleted(oldDoc);
- var deleted = utils.isDeleted(docInfo.metadata);
- var isRoot = /^1-/.test(docInfo.metadata.rev);
-
- if (previouslyDeleted && !deleted && newEdits && isRoot) {
- var newDoc = docInfo.data;
- newDoc._rev = oldDoc.rev;
- newDoc._id = docInfo.metadata.id;
- docInfo = utils.parseDoc(newDoc, newEdits);
- }
-
- var merged =
- merge.merge(oldDoc.rev_tree, docInfo.metadata.rev_tree[0], 1000);
-
- var inConflict = newEdits && ((previouslyDeleted && deleted) ||
- (!previouslyDeleted && newEdits && merged.conflicts !== 'new_leaf') ||
- (previouslyDeleted && !deleted && merged.conflicts === 'new_branch'));
-
- if (inConflict) {
- results[index] = errors.conflict(docInfo.metadata.id);
- return callback();
- }
- var newRev = docInfo.metadata.rev;
- docInfo.metadata.rev_tree = merged.tree;
- docInfo.metadata.rev_map = oldDoc.rev_map;
-
- var delta = 0;
- if (newEdits || merge.winningRev(docInfo.metadata) === newRev) {
- // if newEdits==false and we're pushing existing revisions,
- // then the only thing that matters is whether this revision
- // is the winning one, and thus replaces an old one
- delta = (previouslyDeleted === deleted) ? 0 :
- previouslyDeleted < deleted ? -1 : 1;
- }
-
- incrementDocCount(delta, function (err) {
- if (err) {
- return callback(err);
- }
- writeDoc(docInfo, index, callback);
- });
-
+ function processDocs() {
+ utils.processDocs(docInfos, api, fetchedDocs,
+ null, results, writeDoc, opts, complete);
}
- function writeDoc(doc, index, callback2) {
+ function writeDoc(doc, delta, winningRev, deleted, callback2, isUpdate,
+ resultsIdx) {
var err = null;
var recv = 0;
doc.data._id = doc.metadata.id;
doc.data._rev = doc.metadata.rev;
- if (utils.isDeleted(doc.metadata)) {
+ if (deleted) {
doc.data._deleted = true;
}
@@ -658,11 +554,11 @@
return stores.metaStore.put(UPDATE_SEQ_KEY, db._updateSeq,
function (err) {
if (err) {
- results[index] = err;
+ results[resultsIdx] = err;
} else {
- results[index] = doc;
+ results[resultsIdx] = doc;
}
- return callback2();
+ incrementDocCount(delta, callback2);
});
});
}
@@ -735,7 +631,10 @@
});
}
- function complete() {
+ function complete(err) {
+ if (err) {
+ return callback(err);
+ }
var aresults = results.map(function (result) {
if (!Object.keys(result).length) {
return {
@@ -760,12 +659,11 @@
callback(null, aresults);
});
}
-
verifyAttachments(function (err) {
if (err) {
return callback(err);
}
- processDocs();
+ fetchExistingDocs(processDocs);
});
});
api._allDocs = function (opts, callback) {
@@ -1197,12 +1095,20 @@
});
};
- api._putLocal = writeLock(function (doc, callback) {
+ api._putLocal = writeLock(function (doc, opts, callback) {
+ if (typeof opts === 'function') {
+ callback = opts;
+ opts = {};
+ }
api._putLocalNoLock(doc, callback);
});
// the NoLock version is for use by bulkDocs
- api._putLocalNoLock = function (doc, callback) {
+ api._putLocalNoLock = function (doc, opts, callback) {
+ if (typeof opts === 'function') {
+ callback = opts;
+ opts = {};
+ }
delete doc._revisions; // ignore this, trust the rev
var oldRev = doc._rev;
var id = doc._id;
diff --git a/lib/adapters/websql.js b/lib/adapters/websql.js
index 4756357..9f7d48a 100644
--- a/lib/adapters/websql.js
+++ b/lib/adapters/websql.js
@@ -659,8 +659,8 @@
}
- function writeDoc(docInfo, winningRev, deleted, callback, isUpdate,
- resultsIdx) {
+ function writeDoc(docInfo, delta, winningRev, deleted, callback,
+ isUpdate, resultsIdx) {
function finish() {
var data = docInfo.data;
diff --git a/lib/utils.js b/lib/utils.js
index 6ada364..d565665 100644
--- a/lib/utils.js
+++ b/lib/utils.js
@@ -632,11 +632,11 @@
};
exports.updateDoc = function updateDoc(prev, docInfo, results,
- i, cb, write, newEdits) {
+ i, callback, write, newEdits) {
if (exports.revExists(prev, docInfo.metadata.rev)) {
results[i] = docInfo;
- return cb();
+ return callback();
}
var previouslyDeleted = exports.isDeleted(prev);
@@ -659,16 +659,27 @@
if (inConflict) {
var err = errors.conflict(docInfo.metadata.id);
results[i] = err;
- return cb();
+ return callback();
}
+ var newRev = docInfo.metadata.rev;
docInfo.metadata.rev_tree = merged.tree;
+ docInfo.metadata.rev_map = prev.rev_map; // used by leveldb
// recalculate
var winningRev = merge.winningRev(docInfo.metadata);
deleted = exports.isDeleted(docInfo.metadata, winningRev);
- write(docInfo, winningRev, deleted, cb, true, i);
+ var delta = 0;
+ if (newEdits || winningRev === newRev) {
+ // if newEdits==false and we're pushing existing revisions,
+ // then the only thing that matters is whether this revision
+ // is the winning one, and thus replaces an old one
+ delta = (previouslyDeleted === deleted) ? 0 :
+ previouslyDeleted < deleted ? -1 : 1;
+ }
+
+ write(docInfo, delta, winningRev, deleted, callback, true, i);
};
// massage an array response, e.g. the response we get from couch
@@ -696,21 +707,23 @@
};
exports.processDocs = function processDocs(docInfos, api, fetchedDocs,
- tx, results, writeDoc, opts) {
+ tx, results, writeDoc, opts,
+ callback) {
- if (!docInfos.length) {
- return;
+ if (!callback) {
+ callback = function () {};
}
- function insertDoc(docInfo, resultsIdx, callback) {
- // Cant insert new deleted documents
- var winningRev = merge.winningRev(docInfo.metadata);
- var deleted = exports.isDeleted(docInfo.metadata, winningRev);
- if ('was_delete' in opts && deleted) {
- results[resultsIdx] = errors.MISSING_DOC;
+ if (!docInfos.length) {
+ return callback();
+ }
+
+ var done = 0;
+
+ function checkDone() {
+ if (++done === docInfos.length) {
return callback();
}
- writeDoc(docInfo, winningRev, deleted, callback, false, resultsIdx);
}
var newEdits = opts.new_edits;
@@ -727,7 +740,7 @@
results[resultsIdx] = {};
}
});
- return;
+ return checkDone();
}
var id = currentDoc.metadata.id;
@@ -738,6 +751,18 @@
}
});
+ function insertDoc(docInfo, resultsIdx, callback) {
+ // Cant insert new deleted documents
+ var winningRev = merge.winningRev(docInfo.metadata);
+ var deleted = exports.isDeleted(docInfo.metadata, winningRev);
+ if ('was_delete' in opts && deleted) {
+ results[resultsIdx] = errors.MISSING_DOC;
+ return callback();
+ }
+ var delta = deleted ? 0 : 1;
+ writeDoc(docInfo, delta, winningRev, deleted, callback, false, resultsIdx);
+ }
+
// in the case of new_edits, the user can provide multiple docs
// with the same id. these need to be processed sequentially
idsToDocs.forEach(function (docs, id) {
@@ -746,6 +771,8 @@
function docWritten() {
if (++numDone < docs.length) {
nextDoc();
+ } else {
+ checkDone();
}
}
function nextDoc() {