(#2623) - compaction improvements
diff --git a/lib/adapter.js b/lib/adapter.js
index 4b04d07..6752a7c 100644
--- a/lib/adapter.js
+++ b/lib/adapter.js
@@ -403,7 +403,7 @@
// by compacting we mean removing all revisions which
// are further from the leaf in revision tree than max_height
AbstractPouchDB.prototype.compactDocument =
- function (docId, max_height, callback) {
+ utils.adapterFun('compactDocument', function (docId, max_height, callback) {
var self = this;
this._getRevisionTree(docId, function (err, rev_tree) {
if (err) {
@@ -427,7 +427,7 @@
});
self._doCompaction(docId, rev_tree, revs, callback);
});
-};
+});
// compact the whole database using single document
// compaction
@@ -437,28 +437,54 @@
callback = opts;
opts = {};
}
- var self = this;
- this.changes({complete: function (err, res) {
- if (err) {
- callback(); // TODO: silently fail
- return;
- }
- var count = res.results.length;
- if (!count) {
- callback();
- return;
- }
- res.results.forEach(function (row) {
- self.compactDocument(row.id, 0, function () {
- count--;
- if (!count) {
- callback();
- }
- });
- });
- }});
-});
+ var self = this;
+ var done = false;
+ var started = 0;
+ var lastSeq;
+ function finish() {
+ self.get('_local/compaction').catch(function () {
+ return false;
+ }).then(function (doc) {
+ doc = doc || {_id: '_local/compaction'};
+ doc.last_seq = lastSeq;
+ return self.put(doc);
+ }).then(function () {
+ callback();
+ }, callback);
+ }
+ self.get('_local/compaction').catch(function () {
+ return false;
+ }).then(function (doc) {
+ if (typeof self._compact === 'function') {
+ if (doc && doc.last_seq) {
+ opts.last_seq = doc.last_seq;
+ }
+ return self._compact(opts, callback);
+ }
+ var copts = {
+ returnDocs: false
+ };
+ if (doc && doc.last_seq) {
+ copts.since = doc.last_seq;
+ }
+ self.changes(copts).on('change', function (row) {
+ started++;
+ self.compactDocument(row.id, 0).then(function () {
+ started--;
+ if (!started && done) {
+ finish();
+ }
+ }, callback);
+ }).on('complete', function (resp) {
+ done = true;
+ lastSeq = resp.last_seq;
+ if (!started) {
+ finish();
+ }
+ }).on('error', callback);
+ });
+});
/* Begin api wrappers. Specific functionality to storage belongs in the
_[method] */
AbstractPouchDB.prototype.get =
diff --git a/lib/adapters/leveldb.js b/lib/adapters/leveldb.js
index 277b047..1a3a6c4 100644
--- a/lib/adapters/leveldb.js
+++ b/lib/adapters/leveldb.js
@@ -659,7 +659,27 @@
processDocs();
};
-
+ api._compact = function (opts, callback) {
+ var self = this;
+ var changeOpts = {};
+ var last_seq = opts.last_seq || 0;
+ changeOpts.start = formatSeq(opts.last_seq ? opts.last_seq + 1 : 0);
+ stores.bySeqStore.readStream(changeOpts)
+ .on('error', callback)
+ .pipe(through(function (entry, _, next) {
+ last_seq = entry.key;
+ self.compactDocument(entry.value._id, 0).then(function () {
+ next();
+ }, next);
+ }, function (next) {
+ self._getLocal('_local/compaction', function (err, doc) {
+ doc = doc || {_id: '_local/compaction'};
+ doc.last_seq = last_seq;
+ self._putLocal(doc, callback);
+ next();
+ });
+ })).on('error', callback);
+ };
api._allDocs = function (opts, callback) {
opts = utils.clone(opts);
countDocs(function (err, docCount) {
@@ -922,7 +942,7 @@
var seqs = metadata.rev_map; // map from rev to seq
metadata.rev_tree = rev_tree;
if (!revs.length) {
- callback();
+ return callback();
}
var batch = [];
batch.push({