(#6018) - simplify processNextBatch in mapreduce
diff --git a/packages/node_modules/pouchdb-mapreduce/src/index.js b/packages/node_modules/pouchdb-mapreduce/src/index.js
index f48d3fc..03f1f3d 100644
--- a/packages/node_modules/pouchdb-mapreduce/src/index.js
+++ b/packages/node_modules/pouchdb-mapreduce/src/index.js
@@ -486,73 +486,62 @@
}
var queue = new TaskQueue();
- // TODO(neojski): https://github.com/daleharvey/pouchdb/issues/1521
- return new Promise(function (resolve, reject) {
-
- function complete() {
- queue.finish().then(function () {
- view.seq = currentSeq;
- resolve();
- });
- }
-
- function processNextBatch() {
- view.sourceDB.changes({
- conflicts: true,
- include_docs: true,
- style: 'all_docs',
- since: currentSeq,
- limit: CHANGES_BATCH_SIZE
- }).on('complete', function (response) {
- var results = response.results;
- if (!results.length) {
- return complete();
- }
- var docIdsToChangesAndEmits = {};
- for (var i = 0, l = results.length; i < l; i++) {
- var change = results[i];
- if (change.doc._id[0] !== '_') {
- mapResults = [];
- doc = change.doc;
-
- if (!doc._deleted) {
- tryCode(view.sourceDB, mapFun, [doc]);
- }
- mapResults.sort(sortByKeyThenValue);
-
- var indexableKeysToKeyValues = {};
- var lastKey;
- for (var j = 0, jl = mapResults.length; j < jl; j++) {
- var obj = mapResults[j];
- var complexKey = [obj.key, obj.id];
- if (collate(obj.key, lastKey) === 0) {
- complexKey.push(j); // dup key+id, so make it unique
- }
- var indexableKey = toIndexableString(complexKey);
- indexableKeysToKeyValues[indexableKey] = obj;
- lastKey = obj.key;
- }
- docIdsToChangesAndEmits[change.doc._id] = {
- indexableKeysToKeyValues: indexableKeysToKeyValues,
- changes: change.changes
- };
- }
- currentSeq = change.seq;
- }
- queue.add(processChange(docIdsToChangesAndEmits, currentSeq));
- if (results.length < CHANGES_BATCH_SIZE) {
- return complete();
- }
- return processNextBatch();
- }).on('error', onError);
- /* istanbul ignore next */
- function onError(err) {
- reject(err);
+ function processNextBatch() {
+ return view.sourceDB.changes({
+ conflicts: true,
+ include_docs: true,
+ style: 'all_docs',
+ since: currentSeq,
+ limit: CHANGES_BATCH_SIZE
+ }).then(function (response) {
+ var results = response.results;
+ if (!results.length) {
+ return;
}
- }
+ var docIdsToChangesAndEmits = {};
+ for (var i = 0, l = results.length; i < l; i++) {
+ var change = results[i];
+ if (change.doc._id[0] !== '_') {
+ mapResults = [];
+ doc = change.doc;
- processNextBatch();
+ if (!doc._deleted) {
+ tryCode(view.sourceDB, mapFun, [doc]);
+ }
+ mapResults.sort(sortByKeyThenValue);
+
+ var indexableKeysToKeyValues = {};
+ var lastKey;
+ for (var j = 0, jl = mapResults.length; j < jl; j++) {
+ var obj = mapResults[j];
+ var complexKey = [obj.key, obj.id];
+ if (collate(obj.key, lastKey) === 0) {
+ complexKey.push(j); // dup key+id, so make it unique
+ }
+ var indexableKey = toIndexableString(complexKey);
+ indexableKeysToKeyValues[indexableKey] = obj;
+ lastKey = obj.key;
+ }
+ docIdsToChangesAndEmits[change.doc._id] = {
+ indexableKeysToKeyValues: indexableKeysToKeyValues,
+ changes: change.changes
+ };
+ }
+ currentSeq = change.seq;
+ }
+ queue.add(processChange(docIdsToChangesAndEmits, currentSeq));
+ if (results.length < CHANGES_BATCH_SIZE) {
+ return;
+ }
+ return processNextBatch();
+ });
+ }
+
+ return processNextBatch().then(function () {
+ return queue.finish();
+ }).then(function () {
+ view.seq = currentSeq;
});
}