Revert OAK-9922 - Parallel Compaction changes
diff --git a/.DS_Store b/.DS_Store
deleted file mode 100644
index c9f5850..0000000
--- a/.DS_Store
+++ /dev/null
Binary files differ
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
index a62070a..bfad0a2 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounter.java
@@ -18,18 +18,17 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.counter;
 
+import java.util.Random;
+import java.util.UUID;
+
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
-import java.util.Random;
-import java.util.UUID;
-
 /**
- * Moved to oak-store-spi
+ * An approximate counter algorithm.
  */
-@Deprecated
 public class ApproximateCounter {
     
     public static final String COUNT_PROPERTY_PREFIX = ":count_";
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java
index 6df3524..cde5ace 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/NodeCounterEditor.java
@@ -22,7 +22,6 @@
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
-import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounter;
 import org.apache.jackrabbit.oak.plugins.index.property.Multiplexers;
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java
index 5815abb..fa11e76 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/counter/jmx/NodeCounter.java
@@ -35,7 +35,7 @@
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
 
 /**
  * A mechanism to retrieve node counter data.
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java
index ff0fe00..8cde725 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/PropertyIndexInfoProvider.java
@@ -24,7 +24,7 @@
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexInfo;
 import org.apache.jackrabbit.oak.plugins.index.IndexInfoProvider;
-import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java
index aec9ec3..3f2346d 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategy.java
@@ -29,7 +29,7 @@
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
-import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
 import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditor;
 import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounter;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry;
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java
index 22909b3..0ff377d 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/UniqueEntryStoreStrategy.java
@@ -32,7 +32,7 @@
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
+import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
diff --git a/oak-store-spi/src/test/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounterTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounterTest.java
similarity index 96%
rename from oak-store-spi/src/test/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounterTest.java
rename to oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounterTest.java
index d4fe3b0..098dd52 100644
--- a/oak-store-spi/src/test/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounterTest.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/counter/ApproximateCounterTest.java
@@ -14,15 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.jackrabbit.oak.plugins.index;
-
-import org.junit.Test;
-
-import java.util.Random;
+package org.apache.jackrabbit.oak.plugins.index.counter;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Random;
+
+import org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter;
+import org.junit.Test;
+
 public class ApproximateCounterTest {
 
     @Test
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java
index 4d2f779..d9f8ba5 100644
--- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/property/strategy/ContentMirrorStoreStrategyTest.java
@@ -26,7 +26,7 @@
 import static org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditor.COUNT_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditor.DEFAULT_RESOLUTION;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static org.apache.jackrabbit.oak.plugins.index.ApproximateCounter.COUNT_PROPERTY_PREFIX;
+import static org.apache.jackrabbit.oak.plugins.index.counter.ApproximateCounter.COUNT_PROPERTY_PREFIX;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
diff --git a/oak-doc/src/site/markdown/nodestore/segment/overview.md b/oak-doc/src/site/markdown/nodestore/segment/overview.md
index 53f1054..7a02190 100644
--- a/oak-doc/src/site/markdown/nodestore/segment/overview.md
+++ b/oak-doc/src/site/markdown/nodestore/segment/overview.md
@@ -288,22 +288,6 @@
 TarMK GC #2: compacting root.
 ```
 
-##### <a name="how-does-compaction-make-use-of-multithreading"/> How does compaction make use of multithreading?
-
-The parallel compactor adds an initial exploration phase to the compaction process, which scans and splits the content tree
-into multiple parts to be processed simultaneously. For this to be efficient, the tree is only expanded until a certain 
-number of nodes is reached, which is defined relative to the number of threads (main thread + compaction workers).
-
-```
-TarMK GC #2: compacting with 8 threads.
-TarMK GC #2: exploring content tree to find subtrees for parallel compaction.
-TarMK GC #2: target node count for expansion is 7000, based on 7 available workers.
-TarMK GC #2: Found 3 nodes at depth 1, target is 7000.
-TarMK GC #2: Found 48 nodes at depth 2, target is 7000.
-TarMK GC #2: Found 663 nodes at depth 3, target is 7000.
-TarMK GC #2: Found 66944 nodes at depth 4, target is 7000.
-```
-
 ##### <a name="how-does-compaction-works-with-concurrent-writes"/> How does compaction work with concurrent writes?
 
 When compaction runs as part of online garbage collection, it has to work concurrently with the rest of the system.
@@ -823,25 +807,24 @@
 ### <a name="compact"/> Compact
 
 ```
-java -jar oak-run.jar compact [--force] [--mmap] [--compactor] [--threads] SOURCE [--target-path DESTINATION] [--persistent-cache-path PERSISTENT_CACHE_PATH] [--persistent-cache-size-gb <PERSISTENT_CACHE_SIZE_GB>]
+java -jar oak-run.jar compact [--force] [--mmap] [--compactor] SOURCE [--target-path DESTINATION] [--persistent-cache-path PERSISTENT_CACHE_PATH] [--persistent-cache-size-gb <PERSISTENT_CACHE_SIZE_GB>]
 ```
 
 The `compact` command performs offline compaction of the local/remote Segment Store at `SOURCE`. 
 `SOURCE` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store and AWS Segment Store the supported remote Segment Stores. 
 Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs.
 
-If the optional `--force [Boolean]` argument is set to `true` the tool ignores a non-matching Segment Store version. *CAUTION*: this will upgrade the Segment Store to the 
+If the optional `--force [Boolean]` argument is set to `true` the tool ignores a non 
+matching Segment Store version. *CAUTION*: this will upgrade the Segment Store to the 
 latest version, which is incompatible with older versions. *There is no way to downgrade 
 an accidentally upgraded Segment Store*.  
 
 The optional `--mmap [Boolean]` argument can be used to control the file access mode. Set
 to `true` for memory mapped access and `false` for file access. If not specified, memory 
-mapped access is used on 64-bit systems and file access is used on 32-bit systems. On
+mapped access is used on 64 bit systems and file access is used on 32 bit systems. On
 Windows, regular file access is always enforced and this option is ignored.
 
-The optional `--compactor [String]` argument can be used to pick the compactor type to be used. Valid choices are *classic*, *diff* and *parallel*. While *classic* is slower, it might be more stable, due to lack of optimisations employed by the *diff* compactor which compacts the checkpoints on top of each other and the *parallel* compactor, which additionally divides the repository into multiple parts to process in parallel. If not specified, *parallel* compactor is used.
-
-The optional `--threads [Integer]` argument specifies the number of threads to use for compaction. This is only applicable to the *parallel* compactor. If not specified, this defaults to the number of available processors.
+The optional `--compactor [String]` argument can be used to pick the compactor type to be used. Valid choices are *classic* and *diff*. While the former is slower, it might be more stable, due to lack of optimisations employed by the *diff* compactor which compacts the checkpoints on top of each other. If not specified, *diff* compactor is used.
 
 In order to speed up offline compaction for remote Segment Stores, three new options were introduced for configuring the destination segment store where compacted archives will be written and also to configure a persistent disk cache for speeding up segments reading during compaction. All three options detailed below **apply only for remote Segment Stores**.
 
diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
index 157c863..212c7a0 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
@@ -19,15 +19,16 @@
 
 import java.io.File;
 
+import org.apache.jackrabbit.guava.common.base.StandardSystemProperty;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import org.apache.jackrabbit.oak.run.commons.Command;
-import org.apache.jackrabbit.oak.segment.aws.tool.AwsCompact;
 import org.apache.jackrabbit.oak.segment.azure.tool.AzureCompact;
+import org.apache.jackrabbit.oak.segment.azure.tool.AzureCompact.Builder;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
+import org.apache.jackrabbit.oak.segment.aws.tool.AwsCompact;
 import org.apache.jackrabbit.oak.segment.tool.Compact;
-import org.apache.jackrabbit.guava.common.base.StandardSystemProperty;
 
 class CompactCommand implements Command {
 
@@ -55,16 +56,11 @@
                 .withOptionalArg()
                 .ofType(Boolean.class);
         OptionSpec<String> compactor = parser.accepts("compactor",
-                "Allow the user to control compactor type to be used. Valid choices are \"classic\", \"diff\", \"parallel\". " +
-                        "While \"classic\" is slower, it might be more stable, due to lack of optimisations employed " +
-                        "by the \"diff\" compactor which compacts the checkpoints on top of each other and \"parallel\" compactor, which splits " +
-                        "the repository into smaller parts and compacts them concurrently. If not specified, \"parallel\" compactor is used.")
+                "Allow the user to control compactor type to be used. Valid choices are \"classic\" and \"diff\". " +
+                        "While the former is slower, it might be more stable, due to lack of optimisations employed " +
+                        "by the \"diff\" compactor which compacts the checkpoints on top of each other. If not " +
+                        "specified, \"diff\" compactor is used.")
                 .withRequiredArg().ofType(String.class);
-        OptionSpec<Integer> nThreads = parser.accepts("threads", "Specify the number of threads used" +
-                "for compaction. This is only applicable to the \"parallel\" compactor. Defaults to 1.")
-                .withRequiredArg()
-                .ofType(Integer.class)
-                .defaultsTo(1);
         OptionSpec<String> targetPath = parser.accepts("target-path", "Path/URI to TAR/remote segment store where " +
                 "resulting archives will be written")
                 .withRequiredArg()
@@ -76,9 +72,8 @@
         OptionSpec<Integer> persistentCacheSizeGb = parser.accepts("persistent-cache-size-gb", "Size in GB (defaults to 50 GB) for "
                 + "the persistent disk cache")
                 .withRequiredArg()
-                .ofType(Integer.class)
-                .defaultsTo(50);
-
+                .defaultsTo("50")
+                .ofType(Integer.class);
 
         OptionSet options = parser.parse(args);
 
@@ -90,7 +85,7 @@
             System.exit(-1);
         }
 
-        int code;
+        int code = 0;
 
         if (path.startsWith("az:")) {
             if (targetPath.value(options) == null) {
@@ -105,48 +100,45 @@
                 System.exit(-1);
             }
 
-            AzureCompact.Builder azureBuilder = AzureCompact.builder()
+            Builder azureBuilder = AzureCompact.builder()
                     .withPath(path)
                     .withTargetPath(targetPath.value(options))
                     .withPersistentCachePath(persistentCachePath.value(options))
                     .withPersistentCacheSizeGb(persistentCacheSizeGb.value(options))
                     .withForce(isTrue(forceArg.value(options)))
-                    .withGCLogInterval(Long.getLong("compaction-progress-log", 150000))
-                    .withConcurrency(nThreads.value(options));
+                    .withGCLogInterval(Long.getLong("compaction-progress-log", 150000));
 
             if (options.has(compactor)) {
                 azureBuilder.withCompactorType(CompactorType.fromDescription(compactor.value(options)));
             }
 
-            code = azureBuilder.build().run();
+            code = azureBuilder
+                    .build()
+                    .run();
         } else if (path.startsWith("aws:")) {
-            AwsCompact.Builder awsBuilder = AwsCompact.builder()
+            code = AwsCompact.builder()
                     .withPath(path)
                     .withForce(isTrue(forceArg.value(options)))
                     .withSegmentCacheSize(Integer.getInteger("cache", 256))
                     .withGCLogInterval(Long.getLong("compaction-progress-log", 150000))
-                    .withConcurrency(nThreads.value(options));
-
-            if (options.has(compactor)) {
-                awsBuilder.withCompactorType(CompactorType.fromDescription(compactor.value(options)));
-            }
-
-            code = awsBuilder.build().run();
+                    .build()
+                    .run();
         } else {
-            Compact.Builder tarBuilder = Compact.builder()
+            org.apache.jackrabbit.oak.segment.tool.Compact.Builder tarBuilder = Compact.builder()
                     .withPath(new File(path))
                     .withForce(isTrue(forceArg.value(options)))
                     .withMmap(mmapArg.value(options))
                     .withOs(StandardSystemProperty.OS_NAME.value())
                     .withSegmentCacheSize(Integer.getInteger("cache", 256))
-                    .withGCLogInterval(Long.getLong("compaction-progress-log", 150000))
-                    .withConcurrency(nThreads.value(options));
+                    .withGCLogInterval(Long.getLong("compaction-progress-log", 150000));
 
             if (options.has(compactor)) {
                 tarBuilder.withCompactorType(CompactorType.fromDescription(compactor.value(options)));
             }
 
-            code = tarBuilder.build().run();
+            code = tarBuilder
+                    .build()
+                    .run();
         }
 
         System.exit(code);
diff --git a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
index 1c77b9b..0b7514f 100644
--- a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
+++ b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
@@ -34,7 +34,6 @@
 
 import org.apache.jackrabbit.oak.segment.SegmentCache;
 import org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
-import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.file.JournalReader;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
@@ -73,10 +72,6 @@
 
         private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
 
-        private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
-
-        private int concurrency = 1;
-
         private Builder() {
             // Prevent external instantiation.
         }
@@ -134,27 +129,6 @@
         }
 
         /**
-         * The compactor type to be used by compaction. If not specified it defaults to
-         * "parallel" compactor
-         * @param compactorType the compactor type
-         * @return this builder
-         */
-        public Builder withCompactorType(CompactorType compactorType) {
-            this.compactorType = compactorType;
-            return this;
-        }
-
-        /**
-         * The number of threads to be used for compaction. This only applies to the "parallel" compactor
-         * @param concurrency the number of threads
-         * @return this builder
-         */
-        public Builder withConcurrency(int concurrency) {
-            this.concurrency = concurrency;
-            return this;
-        }
-
-        /**
          * Create an executable version of the {@link Compact} command.
          *
          * @return an instance of {@link Runnable}.
@@ -173,17 +147,11 @@
 
     private final long gcLogInterval;
 
-    private final CompactorType compactorType;
-
-    private final int concurrency;
-
     private AwsCompact(Builder builder) {
         this.path = builder.path;
         this.segmentCacheSize = builder.segmentCacheSize;
         this.strictVersionCheck = !builder.force;
         this.gcLogInterval = builder.gcLogInterval;
-        this.compactorType = builder.compactorType;
-        this.concurrency = builder.concurrency;
     }
 
     public int run() throws IOException {
@@ -205,7 +173,7 @@
         System.out.printf("    -> compacting\n");
 
         try (FileStore store = newFileStore(persistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize,
-                gcLogInterval, compactorType, concurrency)) {
+                gcLogInterval)) {
             if (!store.compactFull()) {
                 System.out.printf("Compaction cancelled after %s.\n", printableStopwatch(watch));
                 return 1;
diff --git a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
index e76d18b..f198ae6 100644
--- a/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
+++ b/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
@@ -34,7 +34,6 @@
 import org.apache.jackrabbit.oak.segment.aws.AwsContext;
 import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
 import org.apache.jackrabbit.oak.segment.aws.Configuration;
-import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
 import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
@@ -70,26 +69,14 @@
     }
 
     public static FileStore newFileStore(SegmentNodeStorePersistence persistence, File directory,
-                                         boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval)
-            throws IOException, InvalidFileStoreVersionException {
-        return newFileStore(persistence, directory, strictVersionCheck, segmentCacheSize,
-                gcLogInterval, CompactorType.PARALLEL_COMPACTOR, 1);
-    }
-
-    public static FileStore newFileStore(SegmentNodeStorePersistence persistence, File directory,
-                                         boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval,
-                                         CompactorType compactorType, int gcConcurrency)
-            throws IOException, InvalidFileStoreVersionException {
+            boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval)
+            throws IOException, InvalidFileStoreVersionException, URISyntaxException {
         FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(directory)
                 .withCustomPersistence(persistence)
                 .withMemoryMapping(false)
                 .withStrictVersionCheck(strictVersionCheck)
                 .withSegmentCacheSize(segmentCacheSize)
-                .withGCOptions(defaultGCOptions()
-                        .setOffline()
-                        .setGCLogInterval(gcLogInterval)
-                        .setCompactorType(compactorType)
-                        .setConcurrency(gcConcurrency));
+                .withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval));
 
         return builder.build();
     }
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
index a8e103c..7ca8188 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
@@ -79,9 +79,7 @@
 
         private int segmentCacheSize = 2048;
 
-        private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
-
-        private int concurrency = 1;
+        private CompactorType compactorType = CompactorType.CHECKPOINT_COMPACTOR;
 
         private String persistentCachePath;
 
@@ -161,7 +159,7 @@
 
         /**
          * The compactor type to be used by compaction. If not specified it defaults to
-         * "parallel" compactor
+         * "diff" compactor
          * @param compactorType the compactor type
          * @return this builder
          */
@@ -171,16 +169,6 @@
         }
 
         /**
-         * The number of threads to be used for compaction. This only applies to the "parallel" compactor
-         * @param concurrency the number of threads
-         * @return this builder
-         */
-        public Builder withConcurrency(int concurrency) {
-            this.concurrency = concurrency;
-            return this;
-        }
-
-        /**
          * The path where segments in the persistent cache will be stored.
          *
          * @param persistentCachePath
@@ -227,8 +215,6 @@
 
     private final CompactorType compactorType;
 
-    private final int concurrency;
-
     private String persistentCachePath;
 
     private Integer persistentCacheSizeGb;
@@ -240,7 +226,6 @@
         this.strictVersionCheck = !builder.force;
         this.gcLogInterval = builder.gcLogInterval;
         this.compactorType = builder.compactorType;
-        this.concurrency = builder.concurrency;
         this.persistentCachePath = builder.persistentCachePath;
         this.persistentCacheSizeGb = builder.persistentCacheSizeGb;
     }
@@ -269,7 +254,7 @@
         System.out.printf("    -> compacting\n");
 
         try (FileStore store = newFileStore(splitPersistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize,
-                gcLogInterval, compactorType, concurrency)) {
+                gcLogInterval, compactorType)) {
             if (!store.compactFull()) {
                 System.out.printf("Compaction cancelled after %s.\n", printableStopwatch(watch));
                 return 1;
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
index 325d648..cfc3d4c 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
@@ -86,26 +86,14 @@
     }
 
     public static FileStore newFileStore(SegmentNodeStorePersistence persistence, File directory,
-                                         boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval, CompactorType compactorType)
-            throws IOException, InvalidFileStoreVersionException {
-        return newFileStore(persistence, directory, strictVersionCheck,
-                segmentCacheSize, gcLogInterval, compactorType, 1);
-    }
-
-    public static FileStore newFileStore(SegmentNodeStorePersistence persistence, File directory,
-            boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval, CompactorType compactorType, int gcConcurrency)
-            throws IOException, InvalidFileStoreVersionException {
-        return FileStoreBuilder.fileStoreBuilder(directory)
-                .withCustomPersistence(persistence)
-                .withMemoryMapping(false)
-                .withStrictVersionCheck(strictVersionCheck)
+            boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval, CompactorType compactorType)
+            throws IOException, InvalidFileStoreVersionException, URISyntaxException, StorageException {
+        FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(directory)
+                .withCustomPersistence(persistence).withMemoryMapping(false).withStrictVersionCheck(strictVersionCheck)
                 .withSegmentCacheSize(segmentCacheSize)
-                .withGCOptions(defaultGCOptions()
-                        .setOffline()
-                        .setGCLogInterval(gcLogInterval)
-                        .setCompactorType(compactorType)
-                        .setConcurrency(gcConcurrency))
-                .build();
+                .withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval).setCompactorType(compactorType));
+
+        return builder.build();
     }
 
     public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType,
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
index 0b6f5a0..51677e8 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CheckpointCompactor.java
@@ -25,7 +25,6 @@
 import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
 import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static org.apache.jackrabbit.oak.segment.CompactorUtils.getStableIdBytes;
 
 import java.io.IOException;
 import java.util.Date;
@@ -33,7 +32,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
@@ -59,13 +57,13 @@
  */
 public class CheckpointCompactor implements Compactor {
     @NotNull
-    protected final GCMonitor gcListener;
+    private final GCMonitor gcListener;
 
     @NotNull
     private final Map<NodeState, NodeState> cpCache = newHashMap();
 
     @NotNull
-    protected final ClassicCompactor compactor;
+    private final ClassicCompactor compactor;
 
     @NotNull
     private final NodeWriter nodeWriter;
@@ -141,8 +139,14 @@
             childBuilder.setChildNode(getName(path), state);
         }
 
-        return nodeWriter.writeNode(builder.getNodeState(),
-                Objects.requireNonNull(getStableIdBytes(uncompacted)));
+        return nodeWriter.writeNode(builder.getNodeState(), getStableIdBytes(uncompacted));
+    }
+
+    @Nullable
+    private static Buffer getStableIdBytes(@NotNull NodeState node) {
+        return node instanceof SegmentNodeState
+            ? ((SegmentNodeState) node).getStableIdBytes()
+            : null;
     }
 
     @NotNull
@@ -229,19 +233,6 @@
         }
 
     /**
-     * Delegate compaction to another, usually simpler, implementation.
-     */
-    @Nullable
-    protected SegmentNodeState compactWithDelegate(
-            @NotNull NodeState before,
-            @NotNull NodeState after,
-            @NotNull NodeState onto,
-            Canceller canceller
-    ) throws IOException {
-        return compactor.compact(before, after, onto, canceller);
-    }
-
-    /**
      * Compact {@code after} against {@code before} on top of {@code onto} unless
      * {@code after} has been compacted before and is found in the cache. In this
      * case the cached version of the previously compacted {@code before} is returned.
@@ -257,7 +248,7 @@
         gcListener.info("compacting {}.", path);
         NodeState compacted = cpCache.get(after);
         if (compacted == null) {
-            compacted = compactWithDelegate(before, after, onto, canceller);
+            compacted = compactor.compact(before, after, onto, canceller);
             if (compacted == null) {
                 return null;
             } else {
@@ -269,4 +260,5 @@
             return new Result(compacted, before, onto);
         }
     }
+
 }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
index 61a2fb6..74c8fb5 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ClassicCompactor.java
@@ -123,10 +123,13 @@
         return new CompactDiff(onto, canceller).diff(before, after);
     }
 
-    protected SegmentNodeState writeNodeState(NodeState nodeState, Buffer stableIdBytes) throws IOException {
-        RecordId nodeId = writer.writeNode(nodeState, stableIdBytes);
-        compactionMonitor.onNode();
-        return new SegmentNodeState(reader, writer, blobStore, nodeId);
+    @Nullable
+    private static Buffer getStableIdBytes(NodeState state) {
+        if (state instanceof SegmentNodeState) {
+            return ((SegmentNodeState) state).getStableIdBytes();
+        } else {
+            return null;
+        }
     }
 
     private class CompactDiff implements NodeStateDiff {
@@ -159,14 +162,15 @@
 
         @Nullable
         SegmentNodeState diff(@NotNull NodeState before, @NotNull NodeState after) throws IOException {
-            boolean success = after.compareAgainstBaseState(before,
-                    new CancelableDiff(this, () -> canceller.check().isCancelled()));
+            boolean success = after.compareAgainstBaseState(before, new CancelableDiff(this, () -> canceller.check().isCancelled()));
             if (exception != null) {
                 throw new IOException(exception);
             } else if (success) {
                 NodeState nodeState = builder.getNodeState();
                 checkState(modCount == 0 || !(nodeState instanceof SegmentNodeState));
-                return writeNodeState(nodeState, CompactorUtils.getStableIdBytes(after));
+                RecordId nodeId = writer.writeNode(nodeState, getStableIdBytes(after));
+                compactionMonitor.onNode();
+                return new SegmentNodeState(reader, writer, blobStore, nodeId);
             } else {
                 return null;
             }
@@ -238,7 +242,7 @@
     }
 
     @NotNull
-    protected PropertyState compact(@NotNull PropertyState property) {
+    private  PropertyState compact(@NotNull PropertyState property) {
         compactionMonitor.onProperty();
         String name = property.getName();
         Type<?> type = property.getType();
@@ -256,4 +260,5 @@
             return createProperty(name, property.getValue(type), type);
         }
     }
+
 }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CompactorUtils.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CompactorUtils.java
deleted file mode 100644
index ef0bae8..0000000
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CompactorUtils.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.jackrabbit.oak.segment;
-
-import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-final class CompactorUtils {
-    @Nullable
-    static Buffer getStableIdBytes(@NotNull NodeState state) {
-        if (state instanceof SegmentNodeState) {
-            return ((SegmentNodeState) state).getStableIdBytes();
-        } else {
-            return null;
-        }
-    }
-}
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
index 8e222b9..94ae636 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
@@ -19,24 +19,21 @@
 
 package org.apache.jackrabbit.oak.segment;
 
-import static org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool.PoolType;
+import static org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
 
-import static java.util.Objects.requireNonNull;
-
+import org.apache.jackrabbit.guava.common.base.Supplier;
+import org.apache.jackrabbit.guava.common.base.Suppliers;
 import org.apache.jackrabbit.oak.segment.WriterCacheManager.Empty;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.function.Supplier;
 
 /**
  * Builder for building {@link DefaultSegmentWriter} instances.
- * The returned instances are thread-safe if {@link #withWriterPool(PoolType)}
- * was specified and <em>not</em> thread-safe if {@link #withoutWriterPool()}
+ * The returned instances are thread safe if {@link #withWriterPool()}
+ * was specified and <em>not</em> thread sage if {@link #withoutWriterPool()}
  * was specified (default).
  * <p>
  * <em>Default:</em> calling one of the {@code build()} methods without previously
@@ -57,15 +54,15 @@
     private final String name;
 
     @NotNull
-    private Supplier<GCGeneration> generation = () -> GCGeneration.NULL;
+    private Supplier<GCGeneration> generation = Suppliers.ofInstance(GCGeneration.NULL);
 
-    private PoolType poolType = null;
+    private boolean pooled = false;
 
     @NotNull
     private WriterCacheManager cacheManager = new WriterCacheManager.Default();
 
     private DefaultSegmentWriterBuilder(@NotNull String name) {
-        this.name = requireNonNull(name);
+        this.name = checkNotNull(name);
     }
 
     /**
@@ -84,13 +81,13 @@
      * If {@link #withoutWriterPool()} was specified all segments will be written
      * at the generation that {@code generation.get()} returned at the time
      * any of the {@code build()} methods is called.
-     * If {@link #withWriterPool(PoolType)} ()} was specified, segments will be written
+     * If {@link #withWriterPool()} was specified a segments will be written
      * at the generation that {@code generation.get()} returns when a new segment
      * is created by the returned writer.
      */
     @NotNull
     public DefaultSegmentWriterBuilder withGeneration(@NotNull Supplier<GCGeneration> generation) {
-        this.generation = requireNonNull(generation);
+        this.generation = checkNotNull(generation);
         return this;
     }
 
@@ -100,22 +97,17 @@
      */
     @NotNull
     public DefaultSegmentWriterBuilder withGeneration(@NotNull GCGeneration generation) {
-        this.generation = () -> requireNonNull(generation);
+        this.generation = Suppliers.ofInstance(checkNotNull(generation));
         return this;
     }
 
-    @NotNull
-    public DefaultSegmentWriterBuilder withWriterPool() {
-        return withWriterPool(PoolType.GLOBAL);
-    }
-
     /**
      * Create a {@code SegmentWriter} backed by a {@link SegmentBufferWriterPool}.
      * The returned instance is thread safe.
      */
     @NotNull
-    public DefaultSegmentWriterBuilder withWriterPool(PoolType writerType) {
-        this.poolType = writerType;
+    public DefaultSegmentWriterBuilder withWriterPool() {
+        this.pooled = true;
         return this;
     }
 
@@ -125,7 +117,7 @@
      */
     @NotNull
     public DefaultSegmentWriterBuilder withoutWriterPool() {
-        this.poolType = null;
+        this.pooled = false;
         return this;
     }
 
@@ -134,7 +126,7 @@
      */
     @NotNull
     public DefaultSegmentWriterBuilder with(WriterCacheManager cacheManager) {
-        this.cacheManager = requireNonNull(cacheManager);
+        this.cacheManager = checkNotNull(cacheManager);
         return this;
     }
 
@@ -154,12 +146,12 @@
     @NotNull
     public DefaultSegmentWriter build(@NotNull FileStore store) {
         return new DefaultSegmentWriter(
-                requireNonNull(store),
+                checkNotNull(store),
                 store.getReader(),
                 store.getSegmentIdProvider(),
                 store.getBlobStore(),
                 cacheManager,
-                createWriter(store, poolType),
+                createWriter(store, pooled),
                 store.getBinariesInlineThreshold()
         );
     }
@@ -172,7 +164,7 @@
     @NotNull
     public DefaultSegmentWriter build(@NotNull ReadOnlyFileStore store) {
         return new DefaultSegmentWriter(
-                requireNonNull(store),
+                checkNotNull(store),
                 store.getReader(),
                 store.getSegmentIdProvider(),
                 store.getBlobStore(),
@@ -207,32 +199,52 @@
     @NotNull
     public DefaultSegmentWriter build(@NotNull MemoryStore store) {
         return new DefaultSegmentWriter(
-                requireNonNull(store),
+                checkNotNull(store),
                 store.getReader(),
                 store.getSegmentIdProvider(),
                 store.getBlobStore(),
                 cacheManager,
-                createWriter(store, poolType),
+                createWriter(store, pooled),
                 Segment.MEDIUM_LIMIT
         );
     }
 
     @NotNull
-    private WriteOperationHandler createWriter(@NotNull FileStore store, @Nullable PoolType poolType) {
-        return createWriter(store.getSegmentIdProvider(), store.getReader(), poolType);
-    }
-
-    @NotNull
-    private WriteOperationHandler createWriter(@NotNull MemoryStore store, @Nullable PoolType poolType) {
-        return createWriter(store.getSegmentIdProvider(), store.getReader(), poolType);
-    }
-
-    @NotNull
-    private WriteOperationHandler createWriter(@NotNull SegmentIdProvider idProvider, @NotNull SegmentReader reader, @Nullable PoolType poolType) {
-        if (poolType == null) {
-            return new SegmentBufferWriter(idProvider, reader, name, generation.get());
+    private WriteOperationHandler createWriter(@NotNull FileStore store, boolean pooled) {
+        if (pooled) {
+            return new SegmentBufferWriterPool(
+                    store.getSegmentIdProvider(),
+                    store.getReader(),
+                    name,
+                    generation
+            );
         } else {
-            return SegmentBufferWriterPool.factory(idProvider, reader, name, generation).newPool(poolType);
+            return new SegmentBufferWriter(
+                    store.getSegmentIdProvider(),
+                    store.getReader(),
+                    name,
+                    generation.get()
+            );
         }
     }
+
+    @NotNull
+    private WriteOperationHandler createWriter(@NotNull MemoryStore store, boolean pooled) {
+        if (pooled) {
+            return new SegmentBufferWriterPool(
+                    store.getSegmentIdProvider(),
+                    store.getReader(),
+                    name,
+                    generation
+            );
+        } else {
+            return new SegmentBufferWriter(
+                    store.getSegmentIdProvider(),
+                    store.getReader(),
+                    name,
+                    generation.get()
+            );
+        }
+    }
+
 }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
deleted file mode 100644
index 6c0f20b..0000000
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ParallelCompactor.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.jackrabbit.oak.segment;
-
-import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.plugins.index.ApproximateCounter;
-import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder;
-import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
-import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static org.apache.jackrabbit.oak.segment.CompactorUtils.getStableIdBytes;
-
-/**
- * This compactor implementation leverages the tree structure of the repository for concurrent compaction.
- * It explores the tree breadth-first until the target node count is reached. Every node at this depth will be
- * an entry point for asynchronous compaction. After the exploration phase, the main thread will collect
- * these compaction results and write their parents' node state to disk.
- */
-public class ParallelCompactor extends CheckpointCompactor {
-    /**
-     * Expand repository tree until there are this many nodes for each worker to compact. Tradeoff
-     * between low efficiency of many small tasks and high risk of at least one of the subtrees being
-     * significantly larger than totalSize / numWorkers (unequal work distribution).
-     */
-    private static final int MIN_NODES_PER_WORKER = 1000;
-
-    /**
-     * Stop expansion if tree size grows beyond this many nodes per worker at the latest.
-     */
-    private static final int MAX_NODES_PER_WORKER = 10_000;
-
-    private final int numWorkers;
-
-    private final long totalSizeEstimate;
-
-    /**
-     * Manages workers for asynchronous compaction.
-     */
-    @Nullable
-    private ExecutorService executorService;
-
-    /**
-     * Create a new instance based on the passed arguments.
-     * @param gcListener listener receiving notifications about the garbage collection process
-     * @param reader     segment reader used to read from the segments
-     * @param writer     segment writer used to serialise to segments
-     * @param blobStore  the blob store or {@code null} if none
-     * @param compactionMonitor   notification call back for each compacted nodes, properties, and binaries
-     * @param nThreads   number of threads to use for parallel compaction,
-     *                   negative numbers are interpreted relative to the number of available processors
-     */
-    public ParallelCompactor(
-            @NotNull GCMonitor gcListener,
-            @NotNull SegmentReader reader,
-            @NotNull SegmentWriter writer,
-            @Nullable BlobStore blobStore,
-            @NotNull GCNodeWriteMonitor compactionMonitor,
-            int nThreads) {
-        super(gcListener, reader, writer, blobStore, compactionMonitor);
-
-        int availableProcessors = Runtime.getRuntime().availableProcessors();
-        if (nThreads < 0) {
-            nThreads += availableProcessors + 1;
-        }
-        numWorkers = Math.max(0, nThreads - 1);
-        totalSizeEstimate = compactionMonitor.getEstimatedTotal();
-    }
-
-    /**
-     * Calculates the minimum number of entry points for asynchronous compaction.
-     */
-    private int getMinNodeCount() {
-        return numWorkers * MIN_NODES_PER_WORKER;
-    }
-
-    private int getMaxNodeCount() {
-        return numWorkers * MAX_NODES_PER_WORKER;
-    }
-
-    /**
-     * Represents structure of repository changes. Tree is built by exploration process and subsequently
-     * used to collect and merge asynchronous compaction results.
-     */
-    private class CompactionTree implements NodeStateDiff {
-        @NotNull
-        private final NodeState before;
-        @NotNull
-        private final NodeState after;
-        @NotNull
-        private final NodeState onto;
-        @NotNull
-        private final HashMap<String, CompactionTree> modifiedChildren = new HashMap<>();
-        @NotNull
-        private final List<Property> modifiedProperties = new ArrayList<>();
-        @NotNull
-        private final List<String> removedChildNames = new ArrayList<>();
-        @NotNull
-        private final List<String> removedPropertyNames = new ArrayList<>();
-        /**
-         * Stores result of asynchronous compaction.
-         */
-        @Nullable
-        private Future<SegmentNodeState> compactionFuture;
-
-        CompactionTree(@NotNull NodeState before, @NotNull NodeState after, @NotNull NodeState onto) {
-            this.before = checkNotNull(before);
-            this.after = checkNotNull(after);
-            this.onto = checkNotNull(onto);
-        }
-
-        private class Property {
-            @NotNull
-            private final PropertyState state;
-
-            Property(@NotNull PropertyState state) {
-                this.state = state;
-            }
-
-            @NotNull
-            PropertyState compact() {
-                return compactor.compact(state);
-            }
-        }
-
-        boolean compareStates(Canceller canceller) {
-            return after.compareAgainstBaseState(before,
-                    new CancelableDiff(this, () -> canceller.check().isCancelled()));
-        }
-
-        long getEstimatedSize() {
-            return ApproximateCounter.getCountSync(after);
-        }
-
-        @Override
-        public boolean propertyAdded(PropertyState after) {
-            modifiedProperties.add(new Property(after));
-            return true;
-        }
-
-        @Override
-        public boolean propertyChanged(PropertyState before, PropertyState after) {
-            modifiedProperties.add(new Property(after));
-            return true;
-        }
-
-        @Override
-        public boolean propertyDeleted(PropertyState before) {
-            removedPropertyNames.add(before.getName());
-            return true;
-        }
-
-        @Override
-        public boolean childNodeAdded(String name, NodeState after) {
-            CompactionTree child = new CompactionTree(EMPTY_NODE, after, EMPTY_NODE);
-            modifiedChildren.put(name, child);
-            return true;
-        }
-
-        @Override
-        public boolean childNodeChanged(String name, NodeState before, NodeState after) {
-            CompactionTree child = new CompactionTree(before, after, onto.getChildNode(name));
-            modifiedChildren.put(name, child);
-            return true;
-        }
-
-        @Override
-        public boolean childNodeDeleted(String name, NodeState before) {
-            removedChildNames.add(name);
-            return true;
-        }
-
-        /**
-         * Start asynchronous compaction.
-         */
-        boolean compactAsync(Canceller canceller) {
-            if (compactionFuture != null) {
-                return false;
-            }
-            checkNotNull(executorService);
-            compactionFuture = executorService.submit(() -> compactor.compact(before, after, onto, canceller));
-            return true;
-        }
-
-        /**
-         * Start synchronous compaction on tree or collect result of asynchronous compaction if it has been started.
-         */
-        @Nullable
-        SegmentNodeState compact() throws IOException {
-            if (compactionFuture != null) {
-                try {
-                    return compactionFuture.get();
-                } catch (InterruptedException e) {
-                    return null;
-                } catch (ExecutionException e) {
-                    throw new IOException(e);
-                }
-            }
-
-            MemoryNodeBuilder builder = new MemoryNodeBuilder(onto);
-
-            for (Map.Entry<String, CompactionTree> entry : modifiedChildren.entrySet()) {
-                SegmentNodeState compactedState = entry.getValue().compact();
-                if (compactedState == null) {
-                    return null;
-                }
-                builder.setChildNode(entry.getKey(), compactedState);
-            }
-            for (String childName : removedChildNames) {
-                builder.getChildNode(childName).remove();
-            }
-            for (Property property : modifiedProperties) {
-                builder.setProperty(property.compact());
-            }
-            for (String propertyName : removedPropertyNames) {
-                builder.removeProperty(propertyName);
-            }
-            return compactor.writeNodeState(builder.getNodeState(), getStableIdBytes(after));
-        }
-    }
-
-    /**
-     * Implementation of {@link NodeStateDiff} to build {@link CompactionTree} and start asynchronous compaction on
-     * suitable entry points. Performs what is referred to as the exploration phase in other comments.
-     */
-    private class CompactionHandler {
-        @NotNull
-        private final NodeState base;
-
-        @NotNull
-        private final Canceller canceller;
-
-        CompactionHandler(@NotNull NodeState base, @NotNull Canceller canceller) {
-            this.base = base;
-            this.canceller = canceller;
-        }
-
-        @Nullable
-        SegmentNodeState diff(@NotNull NodeState before, @NotNull NodeState after) throws IOException {
-            checkNotNull(executorService);
-            checkState(!executorService.isShutdown());
-
-            gcListener.info("compacting with {} threads.", numWorkers + 1);
-            gcListener.info("exploring content tree to find subtrees for parallel compaction.");
-            gcListener.info("target node count for expansion is {}, based on {} available workers.",
-                    getMinNodeCount(), numWorkers);
-
-            CompactionTree compactionTree = new CompactionTree(before, after, base);
-            if (!compactionTree.compareStates(canceller)) {
-                return null;
-            }
-
-            List<CompactionTree> topLevel = new ArrayList<>();
-            for (Map.Entry<String, CompactionTree> childEntry : compactionTree.modifiedChildren.entrySet()) {
-                switch (childEntry.getKey()) {
-                    // these tend to be the largest directories, others will not be split up
-                    case "content":
-                    case "oak:index":
-                    case "jcr:system":
-                        topLevel.add(childEntry.getValue());
-                        break;
-                    default:
-                        checkState(childEntry.getValue().compactAsync(canceller));
-                        break;
-                }
-            }
-
-            if (diff(1, topLevel)) {
-                SegmentNodeState compacted = compactionTree.compact();
-                if (compacted != null) {
-                    return compacted;
-                }
-            }
-
-            try {
-                // compaction failed, terminate remaining tasks
-                executorService.shutdown();
-                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
-                    executorService.shutdownNow();
-                }
-            } catch (InterruptedException e) {
-                executorService.shutdownNow();
-            }
-
-            return null;
-        }
-
-        private boolean diff(int depth, List<CompactionTree> nodes) {
-            int targetCount = getMinNodeCount();
-            gcListener.info("Found {} nodes at depth {}, target is {}.", nodes.size(), depth, targetCount);
-
-            if (nodes.size() >= targetCount) {
-                nodes.forEach(node -> node.compactAsync(canceller));
-                return true;
-            } else if (nodes.isEmpty()) {
-                gcListener.info("Amount of changes too small, tree will not be split.");
-                return true;
-            }
-
-            List<CompactionTree> nextDepth = new ArrayList<>();
-            for (CompactionTree node : nodes) {
-                long estimatedSize = node.getEstimatedSize();
-                if (estimatedSize != -1 && estimatedSize <= (totalSizeEstimate / numWorkers)) {
-                    checkState(node.compactAsync(canceller));
-                } else if (nextDepth.size() < getMaxNodeCount()) {
-                    if (!node.compareStates(canceller)) {
-                        return false;
-                    }
-                    nextDepth.addAll(node.modifiedChildren.values());
-                } else {
-                    nextDepth.add(node);
-                }
-            }
-
-            return diff(depth + 1, nextDepth);
-        }
-    }
-
-    @Nullable
-    @Override
-    protected SegmentNodeState compactWithDelegate(
-            @NotNull NodeState before,
-            @NotNull NodeState after,
-            @NotNull NodeState onto,
-            Canceller canceller
-    ) throws IOException {
-        if (numWorkers <= 0) {
-            gcListener.info("using sequential compaction.");
-            return super.compactWithDelegate(before, after, onto, canceller);
-        } else if (executorService == null || executorService.isShutdown()) {
-            executorService = Executors.newFixedThreadPool(numWorkers);
-        }
-        return new CompactionHandler(onto, canceller).diff(before, after);
-    }
-}
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
index cd9d22f..3b8817a 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordCache.java
@@ -19,20 +19,16 @@
 
 package org.apache.jackrabbit.oak.segment;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.jackrabbit.guava.common.cache.CacheStats;
+import org.apache.jackrabbit.guava.common.cache.Weigher;
 import org.jetbrains.annotations.NotNull;
 
 import org.apache.jackrabbit.guava.common.base.Supplier;
-import org.apache.jackrabbit.guava.common.cache.CacheBuilder;
-import org.apache.jackrabbit.guava.common.cache.CacheStats;
-import org.apache.jackrabbit.guava.common.cache.RemovalListener;
-import org.apache.jackrabbit.guava.common.cache.Weigher;
-
-import java.util.concurrent.atomic.LongAdder;
 
 /**
  * Partial mapping of keys of type {@code K} to values of type {@link RecordId}. This is
@@ -41,6 +37,11 @@
  * @param <K>
  */
 public abstract class RecordCache<K> implements Cache<K, RecordId> {
+    private long hitCount;
+    private long missCount;
+    private long loadCount;
+    private long evictionCount;
+
     /**
      * @return number of mappings
      */
@@ -57,7 +58,9 @@
      * @return  access statistics for this cache
      */
     @NotNull
-    public abstract CacheStats getStats();
+    public CacheStats getStats() {
+        return new CacheStats(hitCount, missCount, loadCount, 0, 0, evictionCount);
+    }
 
     /**
      * Factory method for creating {@code RecordCache} instances. The returned
@@ -72,7 +75,7 @@
         if (size <= 0) {
             return new Empty<>();
         } else {
-            return new Default<>(size, CacheWeights.noopWeigher());
+            return new Default<>(size, CacheWeights.<T, RecordId> noopWeigher());
         }
     }
 
@@ -88,7 +91,7 @@
         if (size <= 0) {
             return Empty.emptyFactory();
         } else {
-            return Default.defaultFactory(size, requireNonNull(weigher));
+            return Default.defaultFactory(size, checkNotNull(weigher));
         }
     }
 
@@ -103,29 +106,26 @@
         if (size <= 0) {
             return Empty.emptyFactory();
         } else {
-            return Default.defaultFactory(size, CacheWeights.noopWeigher());
+            return Default.defaultFactory(size, CacheWeights.<T, RecordId> noopWeigher());
         }
     }
 
     private static class Empty<T> extends RecordCache<T> {
-        @NotNull
-        private final LongAdder missCount = new LongAdder();
-
         static final <T> Supplier<RecordCache<T>> emptyFactory() {
-            return Empty::new;
+            return  new Supplier<RecordCache<T>>() {
+                @Override
+                public RecordCache<T> get() {
+                    return new Empty<>();
+                }
+            };
         }
 
         @Override
-        public @NotNull CacheStats getStats() {
-            return new CacheStats(0, missCount.sum(), 0, 0, 0, 0);
-        }
+        public synchronized void put(@NotNull T key, @NotNull RecordId value) { }
 
         @Override
-        public void put(@NotNull T key, @NotNull RecordId value) { }
-
-        @Override
-        public RecordId get(@NotNull T key) {
-            missCount.increment();
+        public synchronized RecordId get(@NotNull T key) {
+            super.missCount++;
             return null;
         }
 
@@ -141,61 +141,66 @@
     }
 
     private static class Default<K> extends RecordCache<K> {
+
         @NotNull
-        private final org.apache.jackrabbit.guava.common.cache.Cache<K, RecordId> cache;
+        private final Map<K, RecordId> records;
+
         @NotNull
         private final Weigher<K, RecordId> weigher;
-        @NotNull
-        private final LongAdder weight = new LongAdder();
-        @NotNull
-        private final LongAdder loadCount = new LongAdder();
 
-        @Override
-        public @NotNull CacheStats getStats() {
-            CacheStats internalStats = cache.stats();
-            // any addition to the cache counts as load by our definition
-            return new CacheStats(internalStats.hitCount(), internalStats.missCount(),
-                    loadCount.sum(), 0, 0,  internalStats.evictionCount());
-        }
+        private long weight = 0;
 
-        static <K> Supplier<RecordCache<K>> defaultFactory(final int size, @NotNull final Weigher<K, RecordId> weigher) {
-            return () -> new Default<>(size, requireNonNull(weigher));
+        static final <K> Supplier<RecordCache<K>> defaultFactory(final int size, @NotNull final Weigher<K, RecordId> weigher) {
+            return new Supplier<RecordCache<K>>() {
+                @Override
+                public RecordCache<K> get() {
+                    return new Default<>(size, checkNotNull(weigher));
+                }
+            };
         }
 
         Default(final int size, @NotNull final Weigher<K, RecordId> weigher) {
-            this.cache = CacheBuilder.newBuilder()
-                    .maximumSize(size * 4L / 3)
-                    .initialCapacity(size)
-                    .concurrencyLevel(4)
-                    .recordStats()
-                    .removalListener((RemovalListener<K, RecordId>) removal -> {
-                        int removedWeight = weigher.weigh(removal.getKey(), removal.getValue());
-                        weight.add(-removedWeight);
-                    })
-                    .build();
-            this.weigher = weigher;
+            this.weigher = checkNotNull(weigher);
+            records = new LinkedHashMap<K, RecordId>(size * 4 / 3, 0.75f, true) {
+                @Override
+                protected boolean removeEldestEntry(Map.Entry<K, RecordId> eldest) {
+                    boolean remove = super.size() > size;
+                    if (remove) {
+                        Default.super.evictionCount++;
+                        weight -= weigher.weigh(eldest.getKey(),
+                                eldest.getValue());
+                    }
+                    return remove;
+                }
+            };
         }
 
         @Override
-        public void put(@NotNull K key, @NotNull RecordId value) {
-            cache.put(key, value);
-            loadCount.increment();
-            weight.add(weigher.weigh(key, value));
+        public synchronized void put(@NotNull K key, @NotNull RecordId value) {
+            super.loadCount++;
+            records.put(key, value);
+            weight += weigher.weigh(key, value);
         }
 
         @Override
-        public RecordId get(@NotNull K key) {
-            return cache.getIfPresent(key);
+        public synchronized RecordId get(@NotNull K key) {
+            RecordId value = records.get(key);
+            if (value == null) {
+                super.missCount++;
+            } else {
+                super.hitCount++;
+            }
+            return value;
         }
 
         @Override
-        public long size() {
-            return cache.size();
+        public synchronized long size() {
+            return records.size();
         }
 
         @Override
         public long estimateCurrentWeight() {
-            return weight.sum();
+            return weight;
         }
     }
 }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
index 6d9bc5f..59c4fad 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
@@ -22,22 +22,17 @@
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
 import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList;
-import static org.apache.jackrabbit.guava.common.collect.Maps.newConcurrentMap;
 import static org.apache.jackrabbit.guava.common.collect.Maps.newHashMap;
 import static org.apache.jackrabbit.guava.common.collect.Sets.newHashSet;
 import static java.lang.Thread.currentThread;
-import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
 
+import org.apache.jackrabbit.guava.common.base.Supplier;
 import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
 import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
@@ -49,7 +44,30 @@
  * <p>
  * Instances of this class are thread safe.
  */
-public abstract class SegmentBufferWriterPool implements WriteOperationHandler {
+public class SegmentBufferWriterPool implements WriteOperationHandler {
+
+    /**
+     * Monitor protecting the state of this pool. Neither of {@link #writers},
+     * {@link #borrowed} and {@link #disposed} must be modified without owning
+     * this monitor.
+     */
+    private final Monitor poolMonitor = new Monitor(true);
+
+    /**
+     * Pool of current writers that are not in use
+     */
+    private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+
+    /**
+     * Writers that are currently in use
+     */
+    private final Set<SegmentBufferWriter> borrowed = newHashSet();
+
+    /**
+     * Retired writers that have not yet been flushed
+     */
+    private final Set<SegmentBufferWriter> disposed = newHashSet();
+
     @NotNull
     private final SegmentIdProvider idProvider;
 
@@ -64,287 +82,151 @@
 
     private short writerId = -1;
 
-    private SegmentBufferWriterPool(
+    public SegmentBufferWriterPool(
             @NotNull SegmentIdProvider idProvider,
             @NotNull SegmentReader reader,
             @NotNull String wid,
             @NotNull Supplier<GCGeneration> gcGeneration) {
-        this.idProvider = idProvider;
-        this.reader = reader;
-        this.wid = wid;
-        this.gcGeneration = gcGeneration;
+        this.idProvider = checkNotNull(idProvider);
+        this.reader = checkNotNull(reader);
+        this.wid = checkNotNull(wid);
+        this.gcGeneration = checkNotNull(gcGeneration);
     }
 
-    public enum PoolType {
-        GLOBAL,
-        THREAD_SPECIFIC;
-    }
-
-    public static class SegmentBufferWriterPoolFactory {
-        @NotNull
-        private final SegmentIdProvider idProvider;
-        @NotNull
-        private final SegmentReader reader;
-        @NotNull
-        private final String wid;
-        @NotNull
-        private final Supplier<GCGeneration> gcGeneration;
-
-        private SegmentBufferWriterPoolFactory(
-                @NotNull SegmentIdProvider idProvider,
-                @NotNull SegmentReader reader,
-                @NotNull String wid,
-                @NotNull Supplier<GCGeneration> gcGeneration) {
-            this.idProvider = requireNonNull(idProvider);
-            this.reader = requireNonNull(reader);
-            this.wid = requireNonNull(wid);
-            this.gcGeneration = requireNonNull(gcGeneration);
-        }
-
-        @NotNull
-        public SegmentBufferWriterPool newPool(@NotNull SegmentBufferWriterPool.PoolType poolType) {
-            switch (poolType) {
-                case GLOBAL:
-                    return new GlobalSegmentBufferWriterPool(idProvider, reader, wid, gcGeneration);
-                case THREAD_SPECIFIC:
-                    return new ThreadSpecificSegmentBufferWriterPool(idProvider, reader, wid, gcGeneration);
-                default:
-                    throw new IllegalArgumentException("Unknown writer pool type.");
-            }
-        }
-    }
-
-    public static SegmentBufferWriterPoolFactory factory(
-            @NotNull SegmentIdProvider idProvider,
-            @NotNull SegmentReader reader,
-            @NotNull String wid,
-            @NotNull Supplier<GCGeneration> gcGeneration) {
-        return new SegmentBufferWriterPoolFactory(idProvider, reader, wid, gcGeneration);
-    }
-
-    private static class ThreadSpecificSegmentBufferWriterPool extends SegmentBufferWriterPool {
-        /**
-         * Read write lock protecting the state of this pool. Multiple threads can access their writers in parallel,
-         * acquiring the read lock. The writer lock is needed for the flush operation since it requires none
-         * of the writers to be in use.
-         */
-        private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
-
-        /**
-         * Pool of writers. Every thread is assigned a unique writer per GC generation, therefore only requiring
-         * a concurrent map to synchronize access to them.
-         */
-        private final ConcurrentMap<Object, SegmentBufferWriter> writers = newConcurrentMap();
-
-        public ThreadSpecificSegmentBufferWriterPool(
-                @NotNull SegmentIdProvider idProvider,
-                @NotNull SegmentReader reader,
-                @NotNull String wid,
-                @NotNull Supplier<GCGeneration> gcGeneration) {
-            super(idProvider, reader, wid, gcGeneration);
-        }
-
-        @NotNull
-        @Override
-        public RecordId execute(@NotNull GCGeneration gcGeneration,
-                                @NotNull WriteOperation writeOperation)
-                throws IOException {
-            lock.readLock().lock();
-            SegmentBufferWriter writer = getWriter(currentThread(), gcGeneration);
-            try {
-                return writeOperation.execute(writer);
-            } finally {
-                lock.readLock().unlock();
-            }
-        }
-
-        @Override
-        public void flush(@NotNull SegmentStore store) throws IOException {
-            lock.writeLock().lock();
-            try {
-                for (SegmentBufferWriter writer : writers.values()) {
-                    writer.flush(store);
-                }
-                writers.clear();
-            } finally {
-                lock.writeLock().unlock();
-            }
-        }
-
-        @NotNull
-        private SegmentBufferWriter getWriter(@NotNull Thread thread, @NotNull GCGeneration gcGeneration) {
-            SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(thread, gcGeneration);
-            return writers.computeIfAbsent(key, f -> newWriter(gcGeneration));
-        }
-    }
-
-    private static class GlobalSegmentBufferWriterPool extends SegmentBufferWriterPool {
-        /**
-         * Monitor protecting the state of this pool. Neither of {@link #writers},
-         * {@link #borrowed} and {@link #disposed} must be modified without owning
-         * this monitor.
-         */
-        private final Monitor poolMonitor = new Monitor(true);
-
-        /**
-         * Pool of current writers that are not in use
-         */
-        private final Map<Object, SegmentBufferWriter> writers = newHashMap();
-
-        /**
-         * Writers that are currently in use
-         */
-        private final Set<SegmentBufferWriter> borrowed = newHashSet();
-
-        /**
-         * Retired writers that have not yet been flushed
-         */
-        private final Set<SegmentBufferWriter> disposed = newHashSet();
-
-        public GlobalSegmentBufferWriterPool(
-                @NotNull SegmentIdProvider idProvider,
-                @NotNull SegmentReader reader,
-                @NotNull String wid,
-                @NotNull Supplier<GCGeneration> gcGeneration) {
-            super(idProvider, reader, wid, gcGeneration);
-        }
-
-        @NotNull
-        @Override
-        public RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull WriteOperation writeOperation)
-                throws IOException {
-            SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(currentThread(), gcGeneration);
-            SegmentBufferWriter writer = borrowWriter(key, gcGeneration);
-            try {
-                return writeOperation.execute(writer);
-            } finally {
-                returnWriter(key, writer);
-            }
-        }
-
-        @Override
-        public void flush(@NotNull SegmentStore store) throws IOException {
-            List<SegmentBufferWriter> toFlush = newArrayList();
-            List<SegmentBufferWriter> toReturn = newArrayList();
-
-            poolMonitor.enter();
-            try {
-                // Collect all writers that are not currently in use and clear
-                // the list so they won't get re-used anymore.
-                toFlush.addAll(writers.values());
-                writers.clear();
-
-                // Collect all borrowed writers, which we need to wait for.
-                // Clear the list so they will get disposed once returned.
-                toReturn.addAll(borrowed);
-                borrowed.clear();
-            } finally {
-                poolMonitor.leave();
-            }
-
-            // Wait for the return of the borrowed writers. This is the
-            // case once all of them appear in the disposed set.
-            if (safeEnterWhen(poolMonitor, allReturned(toReturn))) {
-                try {
-                    // Collect all disposed writers and clear the list to mark them
-                    // as flushed.
-                    toFlush.addAll(toReturn);
-                    disposed.removeAll(toReturn);
-                } finally {
-                    poolMonitor.leave();
-                }
-            }
-
-            // Call flush from outside the pool monitor to avoid potential
-            // deadlocks of that method calling SegmentStore.writeSegment
-            for (SegmentBufferWriter writer : toFlush) {
-                writer.flush(store);
-            }
-        }
-
-        /**
-         * Create a {@code Guard} that is satisfied if and only if {@link #disposed}
-         * contains all items in {@code toReturn}
-         */
-        @NotNull
-        private Monitor.Guard allReturned(final List<SegmentBufferWriter> toReturn) {
-            return new Monitor.Guard(poolMonitor) {
-
-                @Override
-                public boolean isSatisfied() {
-                    return disposed.containsAll(toReturn);
-                }
-
-            };
-        }
-
-        /**
-         * Same as {@code monitor.enterWhen(guard)} but copes with that pesky {@code
-         * InterruptedException} by catching it and setting this thread's
-         * interrupted flag.
-         */
-        private static boolean safeEnterWhen(Monitor monitor, Monitor.Guard guard) {
-            try {
-                monitor.enterWhen(guard);
-                return true;
-            } catch (InterruptedException ignore) {
-                currentThread().interrupt();
-                return false;
-            }
-        }
-
-        /**
-         * Return a writer from the pool by its {@code key}. This method may return
-         * a fresh writer at any time. Callers need to return a writer before
-         * borrowing it again. Failing to do so leads to undefined behaviour.
-         */
-        @NotNull
-        private SegmentBufferWriter borrowWriter(@NotNull Object key, @NotNull GCGeneration gcGeneration) {
-            poolMonitor.enter();
-            try {
-                SegmentBufferWriter writer = writers.remove(key);
-                if (writer == null) {
-                    writer = newWriter(gcGeneration);
-                }
-                borrowed.add(writer);
-                return writer;
-            } finally {
-                poolMonitor.leave();
-            }
-        }
-
-        /**
-         * Return a writer to the pool using the {@code key} that was used to borrow
-         * it.
-         */
-        private void returnWriter(Object key, SegmentBufferWriter writer) {
-            poolMonitor.enter();
-            try {
-                if (borrowed.remove(writer)) {
-                    checkState(writers.put(key, writer) == null);
-                } else {
-                    // Defer flush this writer as it was borrowed while flush() was called.
-                    disposed.add(writer);
-                }
-            } finally {
-                poolMonitor.leave();
-            }
-        }
-    }
-
-    @NotNull
     @Override
+    @NotNull
     public GCGeneration getGCGeneration() {
         return gcGeneration.get();
     }
 
     @NotNull
-    protected SegmentBufferWriter newWriter(@NotNull GCGeneration gcGeneration) {
-        return new SegmentBufferWriter(idProvider, reader, getWriterId(), gcGeneration);
+    @Override
+    public RecordId execute(@NotNull GCGeneration gcGeneration,
+                            @NotNull WriteOperation writeOperation)
+    throws IOException {
+        SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(currentThread(), gcGeneration);
+        SegmentBufferWriter writer = borrowWriter(key, gcGeneration);
+        try {
+            return writeOperation.execute(writer);
+        } finally {
+            returnWriter(key, writer);
+        }
     }
 
+    @Override
+    public void flush(@NotNull SegmentStore store) throws IOException {
+        List<SegmentBufferWriter> toFlush = newArrayList();
+        List<SegmentBufferWriter> toReturn = newArrayList();
+
+        poolMonitor.enter();
+        try {
+            // Collect all writers that are not currently in use and clear
+            // the list so they won't get re-used anymore.
+            toFlush.addAll(writers.values());
+            writers.clear();
+
+            // Collect all borrowed writers, which we need to wait for.
+            // Clear the list so they will get disposed once returned.
+            toReturn.addAll(borrowed);
+            borrowed.clear();
+        } finally {
+            poolMonitor.leave();
+        }
+
+        // Wait for the return of the borrowed writers. This is the
+        // case once all of them appear in the disposed set.
+        if (safeEnterWhen(poolMonitor, allReturned(toReturn))) {
+            try {
+                // Collect all disposed writers and clear the list to mark them
+                // as flushed.
+                toFlush.addAll(toReturn);
+                disposed.removeAll(toReturn);
+            } finally {
+                poolMonitor.leave();
+            }
+        }
+
+        // Call flush from outside the pool monitor to avoid potential
+        // deadlocks of that method calling SegmentStore.writeSegment
+        for (SegmentBufferWriter writer : toFlush) {
+            writer.flush(store);
+        }
+    }
+
+    /**
+     * Create a {@code Guard} that is satisfied if and only if {@link #disposed}
+     * contains all items in {@code toReturn}
+     */
     @NotNull
-    protected String getWriterId() {
+    private Guard allReturned(final List<SegmentBufferWriter> toReturn) {
+        return new Guard(poolMonitor) {
+
+            @Override
+            public boolean isSatisfied() {
+                return disposed.containsAll(toReturn);
+            }
+
+        };
+    }
+
+    /**
+     * Same as {@code monitor.enterWhen(guard)} but copes with that pesky {@code
+     * InterruptedException} by catching it and setting this thread's
+     * interrupted flag.
+     */
+    private static boolean safeEnterWhen(Monitor monitor, Guard guard) {
+        try {
+            monitor.enterWhen(guard);
+            return true;
+        } catch (InterruptedException ignore) {
+            currentThread().interrupt();
+            return false;
+        }
+    }
+
+    /**
+     * Return a writer from the pool by its {@code key}. This method may return
+     * a fresh writer at any time. Callers need to return a writer before
+     * borrowing it again. Failing to do so leads to undefined behaviour.
+     */
+    private SegmentBufferWriter borrowWriter(@NotNull Object key, @NotNull GCGeneration gcGeneration) {
+        poolMonitor.enter();
+        try {
+            SegmentBufferWriter writer = writers.remove(key);
+            if (writer == null) {
+                writer = new SegmentBufferWriter(
+                        idProvider,
+                        reader,
+                        getWriterId(wid),
+                        gcGeneration
+                );
+            }
+            borrowed.add(writer);
+            return writer;
+        } finally {
+            poolMonitor.leave();
+        }
+    }
+
+    /**
+     * Return a writer to the pool using the {@code key} that was used to borrow
+     * it.
+     */
+    private void returnWriter(Object key, SegmentBufferWriter writer) {
+        poolMonitor.enter();
+        try {
+            if (borrowed.remove(writer)) {
+                checkState(writers.put(key, writer) == null);
+            } else {
+                // Defer flush this writer as it was borrowed while flush() was called.
+                disposed.add(writer);
+            }
+        } finally {
+            poolMonitor.leave();
+        }
+    }
+
+    private String getWriterId(String wid) {
         if (++writerId > 9999) {
             writerId = 0;
         }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
index d9250c4..f0180aa 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
@@ -53,14 +53,9 @@
         CLASSIC_COMPACTOR("classic"),
 
         /**
-         * Checkpoint-aware compaction implementation
+         * Checkpoints aware compaction implementation
          */
-        CHECKPOINT_COMPACTOR("diff"),
-
-        /**
-         * Multithreaded compaction implementation
-         */
-        PARALLEL_COMPACTOR("parallel");
+        CHECKPOINT_COMPACTOR("diff");
 
         private final String description;
 
@@ -74,10 +69,8 @@
                 return CLASSIC_COMPACTOR;
             case "diff":
                 return CHECKPOINT_COMPACTOR;
-            case "parallel":
-                return PARALLEL_COMPACTOR;
             default:
-                throw new IllegalArgumentException("Unrecognized compactor type " + description);
+                throw new IllegalArgumentException("Unrecongnized compactor type " + description);
             }
         }
 
@@ -126,11 +119,6 @@
      */
     public static final int MEMORY_THRESHOLD_DEFAULT = 15;
 
-    /**
-     * Default value for {@link #getConcurrency()}
-     */
-    public static final int DEFAULT_CONCURRENCY = 1;
-
     private boolean paused = PAUSE_DEFAULT;
 
     /**
@@ -161,13 +149,7 @@
      */
     private long gcLogInterval = -1;
 
-    /**
-     * Number of threads to use for compaction. Negative numbers are interpreted
-     * relative to number of available processors.
-     */
-    private int concurrency = DEFAULT_CONCURRENCY;
-
-    private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
+    private CompactorType compactorType = CompactorType.CHECKPOINT_COMPACTOR;
 
     public SegmentGCOptions(boolean paused, int retryCount, int forceTimeout) {
         this.paused = paused;
@@ -293,7 +275,6 @@
                     "offline=" + offline +
                     ", retainedGenerations=" + retainedGenerations +
                     ", compactorType=" + compactorType +
-                    ", concurrency=" + concurrency +
                     "}";
         } else {
             return getClass().getSimpleName() + "{" +
@@ -403,7 +384,7 @@
     }
 
     /**
-     * @return the current compactor type (i.e. classic, checkpoint-aware or parallel)
+     * @return the current compactor type (i.e. classic or checkpoint-aware)
      */
     public CompactorType getCompactorType() {
         return compactorType;
@@ -412,27 +393,9 @@
     /**
      * Sets the compactor type to be used for compaction
      * @param compactorType
-     * @return this instance
      */
     public SegmentGCOptions setCompactorType(CompactorType compactorType) {
         this.compactorType = compactorType;
         return this;
     }
-
-    /**
-     * @return the current level of concurrency
-     */
-    public int getConcurrency() {
-        return concurrency;
-    }
-
-    /**
-     * Sets the concurrency level for compaction
-     * @param concurrency number of threads to use
-     * @return this instance                    
-     */
-    public SegmentGCOptions setConcurrency(int concurrency) {
-        this.concurrency = concurrency;
-        return this;
-    }
 }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
index cdde3c7..fc13fa9 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java
@@ -29,13 +29,12 @@
 
 import org.apache.jackrabbit.guava.common.base.Function;
 
+import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
+import org.apache.jackrabbit.oak.segment.ClassicCompactor;
+import org.apache.jackrabbit.oak.segment.Compactor;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.segment.Compactor;
 import org.apache.jackrabbit.oak.segment.SegmentWriter;
-import org.apache.jackrabbit.oak.segment.ClassicCompactor;
-import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
-import org.apache.jackrabbit.oak.segment.ParallelCompactor;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType;
 import org.apache.jackrabbit.oak.segment.file.cancel.Cancellation;
@@ -222,7 +221,6 @@
                 writer.flush();
                 context.getFlusher().flush();
                 context.getGCListener().info("compaction succeeded in {}, after {} cycles", watch, cycles);
-                context.getCompactionMonitor().finished();
                 return compactionSucceeded(context, nextGeneration, compacted.getRecordId());
             } else {
                 context.getGCListener().info("compaction failed after {}, and {} cycles", watch, cycles);
@@ -241,18 +239,15 @@
     private Compactor newCompactor(Context context, SegmentWriter writer) {
         CompactorType compactorType = context.getGCOptions().getCompactorType();
         switch (compactorType) {
-            case PARALLEL_COMPACTOR:
-                return new ParallelCompactor(context.getGCListener(), context.getSegmentReader(), writer,
-                        context.getBlobStore(), context.getCompactionMonitor(),
-                        context.getGCOptions().getConcurrency());
-            case CHECKPOINT_COMPACTOR:
-                return new CheckpointCompactor(context.getGCListener(), context.getSegmentReader(), writer,
-                        context.getBlobStore(), context.getCompactionMonitor());
-            case CLASSIC_COMPACTOR:
-                return new ClassicCompactor(context.getSegmentReader(), writer, context.getBlobStore(),
-                        context.getCompactionMonitor());
-            default:
-                throw new IllegalArgumentException("Unknown compactor type: " + compactorType);
-            }
+        case CHECKPOINT_COMPACTOR:
+            return new CheckpointCompactor(context.getGCListener(), context.getSegmentReader(), writer,
+                    context.getBlobStore(), context.getCompactionMonitor());
+        case CLASSIC_COMPACTOR:
+            return new ClassicCompactor(context.getSegmentReader(), writer, context.getBlobStore(),
+                    context.getCompactionMonitor());
+        default:
+            throw new IllegalArgumentException("Unknown compactor type: " + compactorType);
         }
     }
+
+}
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
index 18375e2..fa9fd05 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
@@ -39,14 +39,13 @@
 import org.apache.jackrabbit.guava.common.io.Closer;
 import org.apache.jackrabbit.guava.common.util.concurrent.UncheckedExecutionException;
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
-import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser;
 import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
@@ -198,7 +197,7 @@
                 defaultSegmentWriterBuilder("c")
                     .with(builder.getCacheManager().withAccessTracking("COMPACT", statsProvider))
                     .withGeneration(generation)
-                    .withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
+                    .withoutWriterPool()
                     .build(this)
         );
 
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
index 9cefe70..9889a80 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java
@@ -625,11 +625,21 @@
         }
 
         void evictOldGeneration(final int newGeneration) {
-            evictCaches(generation -> generation < newGeneration);
+            evictCaches(new Predicate<Integer>() {
+                @Override
+                public boolean apply(Integer generation) {
+                    return generation < newGeneration;
+                }
+            });
         }
 
         void evictGeneration(final int newGeneration) {
-            evictCaches(generation -> generation == newGeneration);
+            evictCaches(new Predicate<Integer>() {
+                @Override
+                public boolean apply(Integer generation) {
+                    return generation == newGeneration;
+                }
+            });
         }
     }
 }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
index c324a7c..c41f8a6 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GCNodeWriteMonitor.java
@@ -18,17 +18,12 @@
  */
 package org.apache.jackrabbit.oak.segment.file;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.jetbrains.annotations.NotNull;
 
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-
 /**
  * Monitors the compaction cycle and keeps a compacted nodes counter, in order
- * to provide a best-effort progress log based on extrapolating the previous
+ * to provide a best effort progress log based on extrapolating the previous
  * size and node count and current size to deduce current node count.
  */
 public class GCNodeWriteMonitor {
@@ -55,19 +50,19 @@
     /**
      * Number of compacted nodes
      */
-    private final AtomicLong nodes = new AtomicLong();
+    private long nodes;
 
     /**
      * Number of compacted properties
      */
-    private final LongAdder properties = new LongAdder();
+    private long properties;
 
     /**
      * Number of compacted binaries
      */
-    private final LongAdder binaries = new LongAdder();
+    private long binaries;
 
-    private volatile boolean running = false;
+    private boolean running = false;
 
     public GCNodeWriteMonitor(long gcProgressLog, @NotNull GCMonitor gcMonitor) {
         this.gcProgressLog = gcProgressLog;
@@ -82,8 +77,7 @@
      * @param currentSize
      *            current repository size
      */
-    public void init(long prevSize, long prevCompactedNodes, long currentSize) {
-        checkState(!running);
+    public synchronized void init(long prevSize, long prevCompactedNodes, long currentSize) {
         if (prevCompactedNodes > 0) {
             estimated = (long) (((double) currentSize / prevSize) * prevCompactedNodes);
             gcMonitor.info(
@@ -93,67 +87,69 @@
         } else {
             gcMonitor.info("unable to estimate number of nodes for compaction, missing gc history.");
         }
-        nodes.set(0);
-        properties.reset();
-        binaries.reset();
+        nodes = 0;
         start = System.currentTimeMillis();
         running = true;
     }
 
-    public void onNode() {
-        long writtenNodes = nodes.incrementAndGet();
-        if (gcProgressLog > 0 && writtenNodes % gcProgressLog == 0) {
+    public synchronized void onNode() {
+        nodes++;
+        if (gcProgressLog > 0 && nodes % gcProgressLog == 0) {
             gcMonitor.info("compacted {} nodes, {} properties, {} binaries in {} ms. {}",
-                    writtenNodes, properties, binaries, System.currentTimeMillis() - start, getPercentageDone());
+                nodes, properties, binaries, System.currentTimeMillis() - start, getPercentageDone());
         }
     }
 
-    public void onProperty() {
-        properties.increment();
+    public synchronized void onProperty() {
+        properties++;
     }
 
-    public void onBinary() {
-        binaries.increment();
+    public synchronized void onBinary() {
+        binaries++;
     }
 
-    public void finished() {
+    public synchronized void finished() {
         running = false;
     }
 
     /**
      * Compacted nodes in current cycle
      */
-    public long getCompactedNodes() {
-        return nodes.get();
+    public synchronized long getCompactedNodes() {
+        return nodes;
     }
 
     /**
      * Estimated nodes to compact in current cycle. Can be {@code -1} if the
      * estimation could not be performed.
      */
-    public long getEstimatedTotal() {
+    public synchronized long getEstimatedTotal() {
         return estimated;
     }
 
     @NotNull
     private String getPercentageDone() {
-        int percentage = getEstimatedPercentage();
-        return (percentage >= 0) ? percentage + "% complete." : "";
+        return estimated > 0
+            ? getEstimatedPercentage() + "% complete."
+            : "";
     }
 
     /**
      * Estimated completion percentage. Can be {@code -1} if the estimation
      * could not be performed.
      */
-    public int getEstimatedPercentage() {
-        if (!running) {
-            return 100;
+    public synchronized int getEstimatedPercentage() {
+        if (estimated > 0) {
+            if (!running) {
+                return 100;
+            } else {
+                return Math.min((int) (100 * ((double) nodes / estimated)), 99);
+            }
         }
-        long numNodes = estimated;
-        return (numNodes <= 0) ? -1 : Math.min((int) (100 * ((double) nodes.get() / numNodes)), 99);
+        return -1;
     }
 
-    public boolean isCompactionRunning() {
+    public synchronized boolean isCompactionRunning() {
         return running;
     }
 
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
index bea1e9e..00038e4 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PriorityCache.java
@@ -19,29 +19,23 @@
 
 package org.apache.jackrabbit.oak.segment.file;
 
-
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkArgument;
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
-
 import static java.lang.Integer.bitCount;
 import static java.lang.Integer.numberOfTrailingZeros;
 import static java.lang.Long.numberOfLeadingZeros;
 import static java.lang.Math.max;
 import static java.util.Arrays.fill;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.jackrabbit.guava.common.base.Predicate;
-import org.apache.jackrabbit.guava.common.base.Supplier;
 import org.apache.jackrabbit.guava.common.cache.CacheStats;
 import org.apache.jackrabbit.guava.common.cache.Weigher;
 import org.apache.jackrabbit.oak.segment.CacheWeights;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import org.apache.jackrabbit.guava.common.base.Predicate;
+import org.apache.jackrabbit.guava.common.base.Supplier;
+
 /**
  * {@code PriorityCache} implements a partial mapping from keys of type {@code K} to values
  * of type  {@code V}. Mappings are associates with a cost, which states how expensive it is
@@ -50,7 +44,7 @@
  * this cache is successfully looked up its cost is incremented by one, unless it has reached
  * its maximum cost of {@link Byte#MAX_VALUE} already.
  * <p>
- * Additionally, this cache tracks a generation for mappings. Mappings of later generations
+ * Additionally this cache tracks a generation for mappings. Mappings of later generations
  * always take precedence over mappings of earlier generations. That is, putting a mapping of
  * a later generation into the cache can cause any mapping of an earlier generation to be evicted
  * regardless of its cost.
@@ -65,24 +59,19 @@
 public class PriorityCache<K, V> {
     private final int rehash;
     private final Entry<?,?>[] entries;
-    private final AtomicInteger[] costs;
-    private final AtomicInteger[] evictions;
+    private final int[] costs = new int[256];
+    private final int[] evictions = new int[256];
 
-    private final LongAdder hitCount = new LongAdder();
-    private final LongAdder missCount = new LongAdder();
-    private final LongAdder loadCount = new LongAdder();
-    private final LongAdder loadExceptionCount = new LongAdder();
-    private final LongAdder evictionCount = new LongAdder();
-    private final LongAdder size = new LongAdder();
-
-    private static class Segment extends ReentrantLock {}
-
-    @NotNull
-    private final Segment[] segments;
+    private long hitCount;
+    private long missCount;
+    private long loadCount;
+    private long loadExceptionCount;
+    private long evictionCount;
+    private long size;
 
     @NotNull
     private final Weigher<K, V> weigher;
-    private final AtomicLong weight = new AtomicLong();
+    private long weight = 0;
 
     /**
      * Static factory for creating new {@code PriorityCache} instances.
@@ -92,7 +81,12 @@
     public static <K, V> Supplier<PriorityCache<K, V>> factory(final int size, @NotNull final Weigher<K, V> weigher) {
         checkArgument(bitCount(size) == 1);
         checkNotNull(weigher);
-        return () -> new PriorityCache<>(size, weigher);
+        return new Supplier<PriorityCache<K, V>>() {
+            @Override
+            public PriorityCache<K, V> get() {
+                return new PriorityCache<>(size, weigher);
+            }
+        };
     }
 
     /**
@@ -102,7 +96,12 @@
      */
     public static <K, V> Supplier<PriorityCache<K, V>> factory(final int size) {
         checkArgument(bitCount(size) == 1);
-        return () -> new PriorityCache<>(size);
+        return new Supplier<PriorityCache<K, V>>() {
+            @Override
+            public PriorityCache<K, V> get() {
+                return new PriorityCache<>(size);
+            }
+        };
     }
 
     private static class Entry<K, V> {
@@ -134,7 +133,7 @@
      * @return the next power of two starting from {@code size}.
      */
     public static long nextPowerOfTwo(int size) {
-        return 1L << (64L - numberOfLeadingZeros(max(1, size) - 1L));
+        return 1L << (64L - numberOfLeadingZeros((long)max(1, size) - 1L));
     }
 
     /**
@@ -145,7 +144,7 @@
      *                  smaller than {@code 32 - numberOfTrailingZeros(size)}.
      */
     PriorityCache(int size, int rehash) {
-        this(size, rehash, CacheWeights.noopWeigher());
+        this(size, rehash, CacheWeights.<K, V> noopWeigher());
     }
 
     /**
@@ -157,21 +156,6 @@
      * @param weigher   Needed to provide an estimation of the cache weight in memory
      */
     public PriorityCache(int size, int rehash, @NotNull Weigher<K, V> weigher) {
-        this(size, rehash, weigher, 1024);
-    }
-
-    /**
-     * Create a new instance of the given {@code size}. {@code rehash} specifies the number
-     * of rehashes to resolve a clash.
-     * @param size        Size of the cache. Must be a power of {@code 2}.
-     * @param rehash      Number of rehashes. Must be greater or equal to {@code 0} and
-     *                    smaller than {@code 32 - numberOfTrailingZeros(size)}.
-     * @param weigher     Needed to provide an estimation of the cache weight in memory
-     * @param numSegments Number of separately locked segments. The implementation assumes an equal
-     *                    number of entries in each segment, requiring numSegments to divide size.
-     *                    Powers of 2 are a safe choice, see @param size.
-     */
-    public PriorityCache(int size, int rehash, @NotNull Weigher<K, V> weigher, int numSegments) {
         checkArgument(bitCount(size) == 1);
         checkArgument(rehash >= 0);
         checkArgument(rehash < 32 - numberOfTrailingZeros(size));
@@ -179,22 +163,6 @@
         entries = new Entry<?,?>[size];
         fill(entries, Entry.NULL);
         this.weigher = checkNotNull(weigher);
-
-        numSegments = Math.min(numSegments, size);
-        checkArgument((size % numSegments) == 0,
-                "Cache size is not a multiple of its segment count.");
-
-        segments = new Segment[numSegments];
-        for (int s = 0; s < numSegments; s++) {
-            segments[s] = new Segment();
-        }
-
-        costs = new AtomicInteger[256];
-        evictions = new AtomicInteger[256];
-        for (int i = 0; i < 256; i++) {
-            costs[i] = new AtomicInteger();
-            evictions[i] = new AtomicInteger();
-        }
     }
 
     /**
@@ -214,16 +182,11 @@
         return (hashCode >> iteration) & (entries.length - 1);
     }
 
-    private Segment getSegment(int index) {
-        int entriesPerSegment = entries.length / segments.length;
-        return segments[index / entriesPerSegment];
-    }
-
     /**
      * @return  the number of mappings in this cache.
      */
     public long size() {
-        return size.sum();
+        return size;
     }
 
     /**
@@ -234,82 +197,62 @@
      * @param initialCost    the initial cost associated with this mapping
      * @return  {@code true} if the mapping has been added, {@code false} otherwise.
      */
-    public boolean put(@NotNull K key, @NotNull V value, int generation, byte initialCost) {
+    public synchronized boolean put(@NotNull K key, @NotNull V value, int generation, byte initialCost) {
         int hashCode = key.hashCode();
         byte cheapest = initialCost;
         int index = -1;
         boolean eviction = false;
-
-        Segment lockedSegment = null;
-
-        try {
-            for (int k = 0; k <= rehash; k++) {
-                int i = project(hashCode, k);
-                Segment segment = getSegment(i);
-                if (segment != lockedSegment) {
-                    if (lockedSegment != null) {
-                        lockedSegment.unlock();
-                    }
-                    lockedSegment = segment;
-                    lockedSegment.lock();
+        for (int k = 0; k <= rehash; k++) {
+            int i = project(hashCode, k);
+            Entry<?, ?> entry = entries[i];
+            if (entry == Entry.NULL) {
+                // Empty slot -> use this index
+                index = i;
+                eviction = false;
+                break;
+            } else if (entry.generation <= generation && key.equals(entry.key)) {
+                // Key exists and generation is greater or equal -> use this index and boost the cost
+                index = i;
+                initialCost = entry.cost;
+                if (initialCost < Byte.MAX_VALUE) {
+                    initialCost++;
                 }
-
-                Entry<?, ?> entry = entries[i];
-                if (entry == Entry.NULL) {
-                    // Empty slot -> use this index
-                    index = i;
-                    eviction = false;
-                    break;
-                } else if (entry.generation <= generation && key.equals(entry.key)) {
-                    // Key exists and generation is greater or equal -> use this index and boost the cost
-                    index = i;
-                    initialCost = entry.cost;
-                    if (initialCost < Byte.MAX_VALUE) {
-                        initialCost++;
-                    }
-                    eviction = false;
-                    break;
-                } else if (entry.generation < generation) {
-                    // Old generation -> use this index
-                    index = i;
-                    eviction = false;
-                    break;
-                } else if (entry.cost < cheapest) {
-                    // Candidate slot, keep on searching for even cheaper slots
-                    cheapest = entry.cost;
-                    index = i;
-                    eviction = true;
-                }
+                eviction = false;
+                break;
+            } else if (entry.generation < generation) {
+                // Old generation -> use this index
+                index = i;
+                eviction = false;
+                break;
+            } else if (entry.cost < cheapest) {
+                // Candidate slot, keep on searching for even cheaper slots
+                cheapest = entry.cost;
+                index = i;
+                eviction = true;
             }
+        }
 
-            if (index >= 0) {
-                Entry<?, ?> oldEntry = entries[index];
-                Entry<?, ?> newEntry = new Entry<>(key, value, generation, initialCost);
-                entries[index] = newEntry;
-                loadCount.increment();
-                costs[initialCost - Byte.MIN_VALUE].incrementAndGet();
-
-                if (oldEntry != Entry.NULL) {
-                    costs[oldEntry.cost - Byte.MIN_VALUE].decrementAndGet();
-                    if (eviction) {
-                        evictions[oldEntry.cost - Byte.MIN_VALUE].incrementAndGet();
-                        evictionCount.increment();
-                    }
-                    weight.addAndGet(-weighEntry(oldEntry));
-                } else {
-                    size.increment();
+        if (index >= 0) {
+            Entry<?, ?> old = entries[index];
+            Entry<?, ?> newE = new Entry<>(key, value, generation, initialCost);
+            entries[index] = newE;
+            loadCount++;
+            costs[initialCost - Byte.MIN_VALUE]++;
+            if (old != Entry.NULL) {
+                costs[old.cost - Byte.MIN_VALUE]--;
+                if (eviction) {
+                    evictions[old.cost - Byte.MIN_VALUE]++;
+                    evictionCount++;
                 }
-
-                weight.addAndGet(weighEntry(newEntry));
-                return true;
+                weight -= weighEntry(old);
+            } else {
+                size++;
             }
-
-            loadExceptionCount.increment();
+            weight += weighEntry(newE);
+            return true;
+        } else {
+            loadExceptionCount++;
             return false;
-        } finally {
-            if (lockedSegment != null) {
-                lockedSegment.unlock();
-            }
         }
     }
 
@@ -322,29 +265,22 @@
      */
     @SuppressWarnings("unchecked")
     @Nullable
-    public V get(@NotNull K key, int generation) {
+    public synchronized V get(@NotNull K key, int generation) {
         int hashCode = key.hashCode();
         for (int k = 0; k <= rehash; k++) {
             int i = project(hashCode, k);
-            Segment segment = getSegment(i);
-            segment.lock();
-
-            try {
-                Entry<?, ?> entry = entries[i];
-                if (generation == entry.generation && key.equals(entry.key)) {
-                    if (entry.cost < Byte.MAX_VALUE) {
-                        costs[entry.cost - Byte.MIN_VALUE].decrementAndGet();
-                        entry.cost++;
-                        costs[entry.cost - Byte.MIN_VALUE].incrementAndGet();
-                    }
-                    hitCount.increment();
-                    return (V) entry.value;
+            Entry<?, ?> entry = entries[i];
+            if (generation == entry.generation && key.equals(entry.key)) {
+                if (entry.cost < Byte.MAX_VALUE) {
+                    costs[entry.cost - Byte.MIN_VALUE]--;
+                    entry.cost++;
+                    costs[entry.cost - Byte.MIN_VALUE]++;
                 }
-            } finally {
-                segment.unlock();
+                hitCount++;
+                return (V) entry.value;
             }
         }
-        missCount.increment();
+        missCount++;
         return null;
     }
 
@@ -353,45 +289,35 @@
      * passed {@code purge} predicate.
      * @param purge
      */
-    public void purgeGenerations(@NotNull Predicate<Integer> purge) {
-        int numSegments = segments.length;
-        int entriesPerSegment = entries.length / numSegments;
-        for (int s = 0; s < numSegments; s++) {
-            segments[s].lock();
-            try {
-                for (int i = 0; i < entriesPerSegment; i++) {
-                    int j = i + s * entriesPerSegment;
-                    Entry<?, ?> entry = entries[j];
-                    if (entry != Entry.NULL && purge.apply(entry.generation)) {
-                        entries[j] = Entry.NULL;
-                        size.decrement();
-                        weight.addAndGet(-weighEntry(entry));
-                    }
-                }
-            } finally {
-                segments[s].unlock();
+    public synchronized void purgeGenerations(@NotNull Predicate<Integer> purge) {
+        for (int i = 0; i < entries.length; i++) {
+            Entry<?, ?> entry = entries[i];
+            if (entry != Entry.NULL && purge.apply(entry.generation)) {
+                entries[i] = Entry.NULL;
+                size--;
+                weight -= weighEntry(entry);
             }
         }
     }
 
+    @SuppressWarnings("unchecked")
     private int weighEntry(Entry<?, ?> entry) {
         return weigher.weigh((K) entry.key, (V) entry.value);
     }
 
     @Override
-    public String toString() {
+    public synchronized String toString() {
         return "PriorityCache" +
-                "{ costs=" + toString(costs) +
-                ", evictions=" + toString(evictions) + " }";
+            "{ costs=" + toString(costs) +
+            ", evictions=" + toString(evictions) + " }";
     }
 
-    private static String toString(AtomicInteger[] ints) {
+    private static String toString(int[] ints) {
         StringBuilder b = new StringBuilder("[");
         String sep = "";
         for (int i = 0; i < ints.length; i++) {
-            int value = ints[i].get();
-            if (value > 0) {
-                b.append(sep).append(i).append("->").append(value);
+            if (ints[i] > 0) {
+                b.append(sep).append(i).append("->").append(ints[i]);
                 sep = ",";
             }
         }
@@ -403,12 +329,11 @@
      */
     @NotNull
     public CacheStats getStats() {
-        return new CacheStats(hitCount.sum(), missCount.sum(), loadCount.sum(),
-                loadExceptionCount.sum(), 0, evictionCount.sum());
+        return new CacheStats(hitCount, missCount, loadCount, loadExceptionCount, 0, evictionCount);
     }
 
     public long estimateCurrentWeight() {
-        return weight.get();
+        return weight;
     }
 
 }
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
index ddda467..50b0ee9 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
@@ -27,19 +27,19 @@
 import org.apache.jackrabbit.guava.common.collect.Maps;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
+import org.apache.jackrabbit.oak.segment.Revisions;
 import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.Revisions;
-import org.apache.jackrabbit.oak.segment.SegmentStore;
-import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
-import org.apache.jackrabbit.oak.segment.SegmentTracker;
 import org.apache.jackrabbit.oak.segment.SegmentIdFactory;
 import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
-import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
-import org.apache.jackrabbit.oak.stats.NoopStats;
+import org.apache.jackrabbit.oak.segment.SegmentReader;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.SegmentTracker;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.stats.NoopStats;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
index a72360d..8c42af7 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/tool/Compact.java
@@ -78,9 +78,7 @@
 
         private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
 
-        private CompactorType compactorType = CompactorType.PARALLEL_COMPACTOR;
-
-        private int concurrency = 1;
+        private CompactorType compactorType = CompactorType.CHECKPOINT_COMPACTOR;
 
         private Builder() {
             // Prevent external instantiation.
@@ -166,7 +164,7 @@
 
         /**
          * The compactor type to be used by compaction. If not specified it defaults to
-         * "parallel" compactor
+         * "diff" compactor
          * @param compactorType the compactor type
          * @return this builder
          */
@@ -176,16 +174,6 @@
         }
 
         /**
-         * The number of threads to be used for compaction. This only applies to the "parallel" compactor
-         * @param concurrency the number of threads
-         * @return this builder
-         */
-        public Builder withConcurrency(int concurrency) {
-            this.concurrency = concurrency;
-            return this;
-        }
-
-        /**
          * Create an executable version of the {@link Compact} command.
          *
          * @return an instance of {@link Runnable}.
@@ -279,8 +267,6 @@
 
     private final CompactorType compactorType;
 
-    private final int concurrency;
-
     private Compact(Builder builder) {
         this.path = builder.path;
         this.journal = new File(builder.path, "journal.log");
@@ -289,7 +275,6 @@
         this.strictVersionCheck = !builder.force;
         this.gcLogInterval = builder.gcLogInterval;
         this.compactorType = builder.compactorType;
-        this.concurrency = builder.concurrency;
     }
 
     public int run() {
@@ -345,8 +330,7 @@
             .withGCOptions(defaultGCOptions()
                 .setOffline()
                 .setGCLogInterval(gcLogInterval)
-                .setCompactorType(compactorType)
-                .setConcurrency(concurrency));
+                .setCompactorType(compactorType));
         if (fileAccessMode.memoryMapped != null) {
             builder.withMemoryMapping(fileAccessMode.memoryMapped);
         }
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
deleted file mode 100644
index 8ae3f84..0000000
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorExternalBlobTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.jackrabbit.oak.segment;
-
-import static java.util.concurrent.TimeUnit.DAYS;
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.addTestContent;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameRecord;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameStableId;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.checkGeneration;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.createBlob;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.getCheckpoint;
-import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
-import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-
-import org.apache.jackrabbit.oak.api.CommitFailedException;
-import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
-import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
-import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
-import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.apache.jackrabbit.oak.segment.test.TemporaryBlobStore;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.jetbrains.annotations.NotNull;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.RuleChain;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-
-public abstract class AbstractCompactorExternalBlobTest {
-
-    private TemporaryFolder folder = new TemporaryFolder(new File("target"));
-
-    private TemporaryBlobStore temporaryBlobStore = new TemporaryBlobStore(folder);
-
-    private FileStore fileStore;
-
-    private SegmentNodeStore nodeStore;
-
-    private Compactor compactor;
-
-    private GCGeneration compactedGeneration;
-
-    @Rule
-    public RuleChain rules = RuleChain.outerRule(folder)
-        .around(temporaryBlobStore);
-
-    public void setup(boolean withBlobStore) throws IOException, InvalidFileStoreVersionException {
-        BlobStore blobStore = temporaryBlobStore.blobStore();
-        FileStoreBuilder fileStoreBuilder = fileStoreBuilder(folder.getRoot());
-
-        if (withBlobStore) {
-            fileStoreBuilder = fileStoreBuilder.withBlobStore(blobStore);
-        }
-
-        fileStore = fileStoreBuilder.build();
-        nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
-        compactedGeneration = newGCGeneration(1,1, true);
-        compactor = createCompactor(fileStore, compactedGeneration);
-    }
-
-    protected abstract Compactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation);
-
-    @After
-    public void tearDown() {
-        fileStore.close();
-    }
-
-    @Test
-    public void testCompact() throws Exception {
-        setup(true);
-
-        // add two blobs which will be persisted in the blob store
-        addTestContent("cp1", nodeStore, SegmentTestConstants.MEDIUM_LIMIT);
-        String cp1 = nodeStore.checkpoint(DAYS.toMillis(1));
-        addTestContent("cp2", nodeStore, SegmentTestConstants.MEDIUM_LIMIT);
-        String cp2 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        // update the two blobs from the blob store
-        updateTestContent("cp1", nodeStore);
-        String cp3 = nodeStore.checkpoint(DAYS.toMillis(1));
-        updateTestContent("cp2", nodeStore);
-        String cp4 = nodeStore.checkpoint(DAYS.toMillis(1));
-        fileStore.close();
-
-        // no blob store configured
-        setup(false);
-
-        // this time the updated blob will be stored in the file store
-        updateTestContent("cp2", nodeStore);
-        String cp5 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        SegmentNodeState uncompacted1 = fileStore.getHead();
-        SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, uncompacted1, EMPTY_NODE, Canceller.newCanceller());
-
-        assertNotNull(compacted1);
-        assertNotSame(uncompacted1, compacted1);
-        checkGeneration(compacted1, compactedGeneration);
-
-        assertSameStableId(uncompacted1, compacted1);
-        assertSameStableId(getCheckpoint(uncompacted1, cp1), getCheckpoint(compacted1, cp1));
-        assertSameStableId(getCheckpoint(uncompacted1, cp2), getCheckpoint(compacted1, cp2));
-        assertSameStableId(getCheckpoint(uncompacted1, cp3), getCheckpoint(compacted1, cp3));
-        assertSameStableId(getCheckpoint(uncompacted1, cp4), getCheckpoint(compacted1, cp4));
-        assertSameStableId(getCheckpoint(uncompacted1, cp5), getCheckpoint(compacted1, cp5));
-        assertSameRecord(getCheckpoint(compacted1, cp5), compacted1.getChildNode("root"));
-    }
-
-    private static void updateTestContent(@NotNull String parent, @NotNull NodeStore nodeStore)
-            throws CommitFailedException, IOException {
-        NodeBuilder rootBuilder = nodeStore.getRoot().builder();
-        NodeBuilder parentBuilder = rootBuilder.child(parent);
-        parentBuilder.child("b").setProperty("bin", createBlob(nodeStore, SegmentTestConstants.MEDIUM_LIMIT));
-        nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-    }
-
-}
\ No newline at end of file
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
deleted file mode 100644
index 7758948..0000000
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/AbstractCompactorTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.jackrabbit.oak.segment;
-
-import static java.util.concurrent.TimeUnit.DAYS;
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.addTestContent;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameRecord;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.assertSameStableId;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.checkGeneration;
-import static org.apache.jackrabbit.oak.segment.CompactorTestUtils.getCheckpoint;
-import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
-import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
-import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
-import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.jetbrains.annotations.NotNull;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public abstract class AbstractCompactorTest {
-    @Rule
-    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
-
-    private FileStore fileStore;
-
-    private SegmentNodeStore nodeStore;
-
-    private Compactor compactor;
-
-    private GCGeneration compactedGeneration;
-
-    @Before
-    public void setup() throws IOException, InvalidFileStoreVersionException {
-        fileStore = fileStoreBuilder(folder.getRoot()).build();
-        nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
-        compactedGeneration = newGCGeneration(1,1, true);
-        compactor = createCompactor(fileStore, compactedGeneration);
-    }
-
-    protected abstract Compactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation);
-
-    @After
-    public void tearDown() {
-        fileStore.close();
-    }
-
-    @Test
-    public void testCompact() throws Exception {
-        addTestContent("cp1", nodeStore, 42);
-        String cp1 = nodeStore.checkpoint(DAYS.toMillis(1));
-        addTestContent("cp2", nodeStore, 42);
-        String cp2 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        SegmentNodeState uncompacted1 = fileStore.getHead();
-        SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, uncompacted1, EMPTY_NODE, Canceller.newCanceller());
-        assertNotNull(compacted1);
-        assertNotSame(uncompacted1, compacted1);
-        checkGeneration(compacted1, compactedGeneration);
-
-        assertSameStableId(uncompacted1, compacted1);
-        assertSameStableId(getCheckpoint(uncompacted1, cp1), getCheckpoint(compacted1, cp1));
-        assertSameStableId(getCheckpoint(uncompacted1, cp2), getCheckpoint(compacted1, cp2));
-        assertSameRecord(getCheckpoint(compacted1, cp2), compacted1.getChildNode("root"));
-
-        // Simulate a 2nd compaction cycle
-        addTestContent("cp3", nodeStore, 42);
-        String cp3 = nodeStore.checkpoint(DAYS.toMillis(1));
-        addTestContent("cp4", nodeStore, 42);
-        String cp4 = nodeStore.checkpoint(DAYS.toMillis(1));
-
-        SegmentNodeState uncompacted2 = fileStore.getHead();
-        SegmentNodeState compacted2 = compactor.compact(uncompacted1, uncompacted2, compacted1, Canceller.newCanceller());
-        assertNotNull(compacted2);
-        assertNotSame(uncompacted2, compacted2);
-        checkGeneration(compacted2, compactedGeneration);
-
-        assertTrue(fileStore.getRevisions().setHead(uncompacted2.getRecordId(), compacted2.getRecordId()));
-
-        assertEquals(uncompacted2, compacted2);
-        assertSameStableId(uncompacted2, compacted2);
-        assertSameStableId(getCheckpoint(uncompacted2, cp1), getCheckpoint(compacted2, cp1));
-        assertSameStableId(getCheckpoint(uncompacted2, cp2), getCheckpoint(compacted2, cp2));
-        assertSameStableId(getCheckpoint(uncompacted2, cp3), getCheckpoint(compacted2, cp3));
-        assertSameStableId(getCheckpoint(uncompacted2, cp4), getCheckpoint(compacted2, cp4));
-        assertSameRecord(getCheckpoint(compacted1, cp1), getCheckpoint(compacted2, cp1));
-        assertSameRecord(getCheckpoint(compacted1, cp2), getCheckpoint(compacted2, cp2));
-        assertSameRecord(getCheckpoint(compacted2, cp4), compacted2.getChildNode("root"));
-    }
-}
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
index baca4d5..cfe547a 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorExternalBlobTest.java
@@ -18,26 +18,125 @@
 
 package org.apache.jackrabbit.oak.segment;
 
+import static java.util.concurrent.TimeUnit.DAYS;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.addTestContent;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameRecord;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameStableId;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.checkGeneration;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createBlob;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createCompactor;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.getCheckpoint;
+import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
+import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
+import org.apache.jackrabbit.oak.segment.test.TemporaryBlobStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TemporaryFolder;
 
-import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
+import java.io.File;
+import java.io.IOException;
 
-public class CheckpointCompactorExternalBlobTest extends AbstractCompactorExternalBlobTest {
-    @Override
-    protected CheckpointCompactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) {
-        SegmentWriter writer = defaultSegmentWriterBuilder("c")
-                .withGeneration(generation)
-                .build(fileStore);
+public class CheckpointCompactorExternalBlobTest {
 
-        return new CheckpointCompactor(
-                GCMonitor.EMPTY,
-                fileStore.getReader(),
-                writer,
-                fileStore.getBlobStore(),
-                GCNodeWriteMonitor.EMPTY);
+    private TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private TemporaryBlobStore tempoararyBlobStore = new TemporaryBlobStore(folder);
+
+    private FileStore fileStore;
+
+    private SegmentNodeStore nodeStore;
+
+    private CheckpointCompactor compactor;
+
+    private GCGeneration compactedGeneration;
+
+    @Rule
+    public RuleChain rules = RuleChain.outerRule(folder)
+        .around(tempoararyBlobStore);
+
+    public void setup(boolean withBlobStore) throws IOException, InvalidFileStoreVersionException {
+        BlobStore blobStore = tempoararyBlobStore.blobStore();
+        FileStoreBuilder fileStoreBuilder = fileStoreBuilder(folder.getRoot());
+
+        if (withBlobStore) {
+            fileStoreBuilder = fileStoreBuilder.withBlobStore(blobStore);
+        }
+
+        fileStore = fileStoreBuilder.build();
+        nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
+        compactedGeneration = newGCGeneration(1,1, true);
+        compactor = createCompactor(fileStore, compactedGeneration);
     }
-}
+
+    @After
+    public void tearDown() {
+        fileStore.close();
+    }
+
+    @Test
+    public void testCompact() throws Exception {
+        setup(true);
+
+        // add two blobs which will be persisted in the blob store
+        addTestContent("cp1", nodeStore, SegmentTestConstants.MEDIUM_LIMIT);
+        String cp1 = nodeStore.checkpoint(DAYS.toMillis(1));
+        addTestContent("cp2", nodeStore, SegmentTestConstants.MEDIUM_LIMIT);
+        String cp2 = nodeStore.checkpoint(DAYS.toMillis(1));
+
+        // update the two blobs from the blob store
+        updateTestContent("cp1", nodeStore);
+        String cp3 = nodeStore.checkpoint(DAYS.toMillis(1));
+        updateTestContent("cp2", nodeStore);
+        String cp4 = nodeStore.checkpoint(DAYS.toMillis(1));
+        fileStore.close();
+
+        // no blob store configured
+        setup(false);
+
+        // this time the updated blob will be stored in the file store
+        updateTestContent("cp2", nodeStore);
+        String cp5 = nodeStore.checkpoint(DAYS.toMillis(1));
+
+        SegmentNodeState uncompacted1 = fileStore.getHead();
+        SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, uncompacted1, EMPTY_NODE, Canceller.newCanceller());
+
+        assertNotNull(compacted1);
+        assertFalse(uncompacted1 == compacted1);
+        checkGeneration(compacted1, compactedGeneration);
+
+        assertSameStableId(uncompacted1, compacted1);
+        assertSameStableId(getCheckpoint(uncompacted1, cp1), getCheckpoint(compacted1, cp1));
+        assertSameStableId(getCheckpoint(uncompacted1, cp2), getCheckpoint(compacted1, cp2));
+        assertSameStableId(getCheckpoint(uncompacted1, cp3), getCheckpoint(compacted1, cp3));
+        assertSameStableId(getCheckpoint(uncompacted1, cp4), getCheckpoint(compacted1, cp4));
+        assertSameStableId(getCheckpoint(uncompacted1, cp5), getCheckpoint(compacted1, cp5));
+        assertSameRecord(getCheckpoint(compacted1, cp5), compacted1.getChildNode("root"));
+    }
+
+    private static void updateTestContent(@NotNull String parent, @NotNull NodeStore nodeStore)
+            throws CommitFailedException, IOException {
+        NodeBuilder rootBuilder = nodeStore.getRoot().builder();
+        NodeBuilder parentBuilder = rootBuilder.child(parent);
+        parentBuilder.child("b").setProperty("bin", createBlob(nodeStore, SegmentTestConstants.MEDIUM_LIMIT));
+        nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+    }
+
+}
\ No newline at end of file
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
index 95ea97f..e6b64c6 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTest.java
@@ -18,26 +18,99 @@
 
 package org.apache.jackrabbit.oak.segment;
 
+import static java.util.concurrent.TimeUnit.DAYS;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.addTestContent;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameRecord;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.assertSameStableId;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.checkGeneration;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.createCompactor;
+import static org.apache.jackrabbit.oak.segment.CheckpointCompactorTestUtils.getCheckpoint;
+import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
+import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
 import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
-import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
-import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
+public class CheckpointCompactorTest {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
 
-public class CheckpointCompactorTest extends AbstractCompactorTest {
-    @Override
-    protected CheckpointCompactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) {
-        SegmentWriter writer = defaultSegmentWriterBuilder("c")
-                .withGeneration(generation)
-                .build(fileStore);
+    private FileStore fileStore;
 
-        return new CheckpointCompactor(
-                GCMonitor.EMPTY,
-                fileStore.getReader(),
-                writer,
-                fileStore.getBlobStore(),
-                GCNodeWriteMonitor.EMPTY);
+    private SegmentNodeStore nodeStore;
+
+    private CheckpointCompactor compactor;
+
+    private GCGeneration compactedGeneration;
+
+    @Before
+    public void setup() throws IOException, InvalidFileStoreVersionException {
+        fileStore = fileStoreBuilder(folder.getRoot()).build();
+        nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
+        compactedGeneration = newGCGeneration(1,1, true);
+        compactor = createCompactor(fileStore, compactedGeneration);
+    }
+
+    @After
+    public void tearDown() {
+        fileStore.close();
+    }
+
+    @Test
+    public void testCompact() throws Exception {
+        addTestContent("cp1", nodeStore, 42);
+        String cp1 = nodeStore.checkpoint(DAYS.toMillis(1));
+        addTestContent("cp2", nodeStore, 42);
+        String cp2 = nodeStore.checkpoint(DAYS.toMillis(1));
+
+        SegmentNodeState uncompacted1 = fileStore.getHead();
+        SegmentNodeState compacted1 = compactor.compact(EMPTY_NODE, uncompacted1, EMPTY_NODE, Canceller.newCanceller());
+        assertNotNull(compacted1);
+        assertFalse(uncompacted1 == compacted1);
+        checkGeneration(compacted1, compactedGeneration);
+
+        assertSameStableId(uncompacted1, compacted1);
+        assertSameStableId(getCheckpoint(uncompacted1, cp1), getCheckpoint(compacted1, cp1));
+        assertSameStableId(getCheckpoint(uncompacted1, cp2), getCheckpoint(compacted1, cp2));
+        assertSameRecord(getCheckpoint(compacted1, cp2), compacted1.getChildNode("root"));
+
+        // Simulate a 2nd compaction cycle
+        addTestContent("cp3", nodeStore, 42);
+        String cp3 = nodeStore.checkpoint(DAYS.toMillis(1));
+        addTestContent("cp4", nodeStore, 42);
+        String cp4 = nodeStore.checkpoint(DAYS.toMillis(1));
+
+        SegmentNodeState uncompacted2 = fileStore.getHead();
+        SegmentNodeState compacted2 = compactor.compact(uncompacted1, uncompacted2, compacted1, Canceller.newCanceller());
+        assertNotNull(compacted2);
+        assertFalse(uncompacted2 == compacted2);
+        checkGeneration(compacted2, compactedGeneration);
+
+        assertTrue(fileStore.getRevisions().setHead(uncompacted2.getRecordId(), compacted2.getRecordId()));
+
+        assertEquals(uncompacted2, compacted2);
+        assertSameStableId(uncompacted2, compacted2);
+        assertSameStableId(getCheckpoint(uncompacted2, cp1), getCheckpoint(compacted2, cp1));
+        assertSameStableId(getCheckpoint(uncompacted2, cp2), getCheckpoint(compacted2, cp2));
+        assertSameStableId(getCheckpoint(uncompacted2, cp3), getCheckpoint(compacted2, cp3));
+        assertSameStableId(getCheckpoint(uncompacted2, cp4), getCheckpoint(compacted2, cp4));
+        assertSameRecord(getCheckpoint(compacted1, cp1), getCheckpoint(compacted2, cp1));
+        assertSameRecord(getCheckpoint(compacted1, cp2), getCheckpoint(compacted2, cp2));
+        assertSameRecord(getCheckpoint(compacted2, cp4), compacted2.getChildNode("root"));
     }
 }
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTestUtils.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTestUtils.java
similarity index 88%
rename from oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTestUtils.java
rename to oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTestUtils.java
index 70d0613..75abf39 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactorTestUtils.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CheckpointCompactorTestUtils.java
@@ -43,9 +43,11 @@
 import java.util.List;
 import java.util.Random;
 
-public class CompactorTestUtils {
+public class CheckpointCompactorTestUtils {
 
-    private CompactorTestUtils() {}
+    private CheckpointCompactorTestUtils() {
+
+    }
 
     public static void checkGeneration(NodeState node, GCGeneration gcGeneration) {
         assertTrue(node instanceof SegmentNodeState);
@@ -83,6 +85,20 @@
                 ((SegmentNodeState) node2).getRecordId());
     }
 
+    @NotNull
+    public static CheckpointCompactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) {
+        SegmentWriter writer = defaultSegmentWriterBuilder("c")
+                .withGeneration(generation)
+                .build(fileStore);
+
+        return new CheckpointCompactor(
+                GCMonitor.EMPTY,
+                fileStore.getReader(),
+                writer,
+                fileStore.getBlobStore(),
+                GCNodeWriteMonitor.EMPTY);
+    }
+
     public static void addTestContent(@NotNull String parent, @NotNull NodeStore nodeStore, int binPropertySize)
             throws CommitFailedException, IOException {
         NodeBuilder rootBuilder = nodeStore.getRoot().builder();
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
index bb4ba63..4e43366 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
@@ -598,11 +598,14 @@
         try {
             SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
 
-            final Callable<Void> cancel = () -> {
-                // Give the compaction thread a head start
-                sleepUninterruptibly(1000, MILLISECONDS);
-                fileStore.cancelGC();
-                return null;
+            final Callable<Void> cancel = new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    // Give the compaction thread a head start
+                    sleepUninterruptibly(1000, MILLISECONDS);
+                    fileStore.cancelGC();
+                    return null;
+                }
             };
 
             for (int k = 0; k < 100; k++) {
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
index e383aa7..ae6856a 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
@@ -24,6 +24,8 @@
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 
+import org.apache.jackrabbit.guava.common.base.Supplier;
+
 import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -34,8 +36,6 @@
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.util.function.Supplier;
-
 public class NodeRecordTest {
 
     private static class Generation implements Supplier<GCGeneration> {
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
deleted file mode 100644
index ae8b817..0000000
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.jackrabbit.oak.segment;
-
-import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
-import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
-import org.jetbrains.annotations.NotNull;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
-
-@RunWith(Parameterized.class)
-public class ParallelCompactorExternalBlobTest extends AbstractCompactorExternalBlobTest {
-
-    private final int concurrency;
-
-    @Parameterized.Parameters
-    public static List<Integer> concurrencyLevels() {
-        return Arrays.asList(1, 2, 4, 8, 16);
-    }
-
-    public ParallelCompactorExternalBlobTest(int concurrency) {
-        this.concurrency = concurrency;
-    }
-
-    @Override
-    protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) {
-        SegmentWriter writer = defaultSegmentWriterBuilder("c")
-                .withGeneration(generation)
-                .withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
-                .build(fileStore);
-
-        return new ParallelCompactor(
-                GCMonitor.EMPTY,
-                fileStore.getReader(),
-                writer,
-                fileStore.getBlobStore(),
-                GCNodeWriteMonitor.EMPTY,
-                concurrency);
-    }
-}
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
deleted file mode 100644
index 27bebb1..0000000
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.jackrabbit.oak.segment;
-
-import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.GCNodeWriteMonitor;
-import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
-import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
-import org.jetbrains.annotations.NotNull;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
-
-@RunWith(Parameterized.class)
-public class ParallelCompactorTest extends AbstractCompactorTest {
-
-    private final int concurrency;
-
-    @Parameterized.Parameters
-    public static List<Integer> concurrencyLevels() {
-        return Arrays.asList(1, 2, 4, 8, 16);
-    }
-
-    public ParallelCompactorTest(int concurrency) {
-        this.concurrency = concurrency;
-    }
-
-    @Override
-    protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, @NotNull GCGeneration generation) {
-        SegmentWriter writer = defaultSegmentWriterBuilder("c")
-                .withGeneration(generation)
-                .withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
-                .build(fileStore);
-
-        return new ParallelCompactor(
-                GCMonitor.EMPTY,
-                fileStore.getReader(),
-                writer,
-                fileStore.getBlobStore(),
-                GCNodeWriteMonitor.EMPTY,
-                concurrency);
-    }
-}
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
index e177200..98db981 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheStatsTest.java
@@ -21,14 +21,15 @@
 import static org.apache.jackrabbit.oak.segment.RecordCache.newRecordCache;
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
+import java.util.Random;
+
 import org.apache.jackrabbit.guava.common.cache.CacheStats;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.Random;
-import java.util.function.Supplier;
+import org.apache.jackrabbit.guava.common.base.Supplier;
 
 public class RecordCacheStatsTest {
     private static final String NAME = "cache stats";
@@ -39,7 +40,16 @@
 
     private final RecordCache<Integer> cache = newRecordCache(KEYS);
     private final RecordCacheStats cacheStats =
-            new RecordCacheStats(NAME, cache::getStats, cache::size, cache::estimateCurrentWeight);
+            new RecordCacheStats(NAME,
+                new Supplier<CacheStats>() {
+                    @Override public CacheStats get() { return cache.getStats(); }
+                },
+                new Supplier<Long>() {
+                    @Override public Long get() { return cache.size(); }
+                },
+                new Supplier<Long>() {
+                    @Override public Long get() { return cache.estimateCurrentWeight(); }
+                });
 
     private int hits;
 
@@ -55,7 +65,7 @@
             cache.put(k, newRecordId());
         }
 
-        for (int k = 0; k < KEYS; k++) {
+        for (int k = 0; k < 100; k++) {
             if (cache.get(4 * k) != null) {
                 hits++;
             }
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java
index fcc9c31..e697f70 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordCacheTest.java
@@ -26,17 +26,9 @@
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.junit.Test;
@@ -68,52 +60,10 @@
     }
 
     @Test
-    public void concurrentPutAndGet() throws ExecutionException, InterruptedException {
-        final int SIZE = 16384;
-
-        RecordCache<String> cache = newRecordCache(SIZE);
-        HashMap<String, RecordId> values = new HashMap<>(SIZE);
-        List<Integer> indices = new ArrayList<>(SIZE);
-
-        for (int k = 0; k < SIZE; k ++) {
-            String key = "key-" + k;
-            RecordId value = newRecordId(idProvider, rnd);
-            values.put(key, value);
-            indices.add(k);
-        }
-
-        Collections.shuffle(indices);
-        ExecutorService executor = Executors.newFixedThreadPool(16);
-        List<Future<String>> putFutures = new ArrayList<>(SIZE);
-        List<Future<Void>> getFutures = new ArrayList<>(SIZE);
-
-        for (int k = 0; k < SIZE; k ++) {
-            int idx = k;
-            putFutures.add(executor.submit(() -> {
-                String key = "key-" + idx;
-                cache.put(key, values.get(key));
-                return key;
-            }));
-        }
-
-        for (Future<String> future : putFutures) {
-            getFutures.add(executor.submit(() -> {
-                String key = future.get();
-                assertEquals(values.get(key), cache.get(key));
-                return null;
-            }));
-        }
-
-        for (Future<Void> future : getFutures) {
-            future.get();
-        }
-    }
-
-    @Test
     public void invalidate() {
-        RecordCache<String> cache = newRecordCache(100);
+        RecordCache<String> cache = newRecordCache(10);
         Map<String, RecordId> keys = newLinkedHashMap();
-        for (int k = 0; k < 100; k ++) {
+        for (int k = 0; k < 10; k ++) {
             String key = "key-" + k;
             RecordId value = newRecordId(idProvider, rnd);
             keys.put(key, value);
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
index c7417e1..fe45586 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
@@ -29,8 +29,7 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -41,12 +40,10 @@
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-@RunWith(Parameterized.class)
 public class SegmentBufferWriterPoolTest {
     private final MemoryStore store = new MemoryStore();
 
@@ -54,21 +51,17 @@
 
     private GCGeneration gcGeneration = GCGeneration.NULL;
 
-    private final SegmentBufferWriterPool pool;
+    private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool(
+            store.getSegmentIdProvider(),
+            store.getReader(),
+            "",
+            () -> gcGeneration
+    );
 
     private final ExecutorService[] executors = new ExecutorService[] {
         newSingleThreadExecutor(), newSingleThreadExecutor(), newSingleThreadExecutor()};
 
-    @Parameterized.Parameters
-    public static List<SegmentBufferWriterPool.PoolType> poolTypes() {
-        return Arrays.asList(SegmentBufferWriterPool.PoolType.values());
-    }
-
-    public SegmentBufferWriterPoolTest(SegmentBufferWriterPool.PoolType poolType) throws IOException {
-        pool = SegmentBufferWriterPool.factory(
-                store.getSegmentIdProvider(), store.getReader(), "", () -> gcGeneration)
-                .newPool(poolType);
-    }
+    public SegmentBufferWriterPoolTest() throws IOException { }
 
     @After
     public void tearDown() {
@@ -78,7 +71,12 @@
     }
 
     private Future<RecordId> execute(GCGeneration gcGeneration, final WriteOperation op, int executor) {
-        return executors[executor].submit(() -> pool.execute(gcGeneration, op));
+        return executors[executor].submit(new Callable<RecordId>() {
+            @Override
+            public RecordId call() throws Exception {
+                return pool.execute(gcGeneration, op);
+            }
+        });
     }
 
     private WriteOperation createOp(final String key, final ConcurrentMap<String, SegmentBufferWriter> map) {
@@ -208,20 +206,27 @@
     @Test
     public void testFlushBlocks() throws ExecutionException, InterruptedException {
         GCGeneration gcGeneration = pool.getGCGeneration();
-        Future<RecordId> res = execute(gcGeneration, writer -> {
-            try {
-                // This should deadlock as flush waits for this write
-                // operation to finish, which in this case contains the
-                // call to flush itself.
-                executors[1].submit(() -> {
-                    pool.flush(store);
-                    return null;
-                }).get(100, MILLISECONDS);
-                return null;    // No deadlock -> null indicates test failure
-            } catch (InterruptedException | ExecutionException ignore) {
-                return null;    // No deadlock -> null indicates test failure
-            } catch (TimeoutException ignore) {
-                return rootId;  // Deadlock -> rootId indicates test pass
+        Future<RecordId> res = execute(gcGeneration, new WriteOperation() {
+            @Nullable
+            @Override
+            public RecordId execute(@NotNull SegmentBufferWriter writer) {
+                try {
+                    // This should deadlock as flush waits for this write
+                    // operation to finish, which in this case contains the
+                    // call to flush itself.
+                    executors[1].submit(new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            pool.flush(store);
+                            return null;
+                        }
+                    }).get(100, MILLISECONDS);
+                    return null;    // No deadlock -> null indicates test failure
+                } catch (InterruptedException | ExecutionException ignore) {
+                    return null;    // No deadlock -> null indicates test failure
+                } catch (TimeoutException ignore) {
+                    return rootId;  // Deadlock -> rootId indicates test pass
+                }
             }
         }, 0);
 
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ConcurrentPriorityCacheTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ConcurrentPriorityCacheTest.java
deleted file mode 100644
index cab79e0..0000000
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/ConcurrentPriorityCacheTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.jackrabbit.oak.segment.file;
-
-import org.apache.jackrabbit.oak.segment.CacheWeights;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import static java.lang.Integer.valueOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assume.assumeTrue;
-
-@RunWith(Parameterized.class)
-public class ConcurrentPriorityCacheTest {
-
-    private final int concurrency;
-
-    @Parameterized.Parameters
-    public static List<Integer> concurrencyLevels() {
-        return Arrays.asList(1, 2, 4, 8, 16, 32);
-    }
-
-    public ConcurrentPriorityCacheTest(int concurrency) {
-        this.concurrency = concurrency;
-    }
-
-    @Test
-    public void concurrentReadWrite() throws ExecutionException, InterruptedException {
-        final int SIZE = 16384;
-        PriorityCache<String, Integer> cache = new PriorityCache<>(SIZE, 10);
-
-        ExecutorService executor1 = Executors.newFixedThreadPool(concurrency);
-        ExecutorService executor2 = Executors.newFixedThreadPool(concurrency);
-
-        List<Future<Boolean>> putFutures = new ArrayList<>(SIZE);
-        List<Future<Void>> getFutures = new ArrayList<>(SIZE);
-
-        for (int k = 0; k < SIZE; k++) {
-            int idx = k;
-            putFutures.add(executor1.submit(
-                    () -> cache.put("key-" + idx, idx, 0, (byte) 0)));
-        }
-
-        for (int k = 0; k < SIZE; k++) {
-            int idx = k;
-            getFutures.add(executor2.submit(() -> {
-                if (putFutures.get(idx).get()) {
-                    assertEquals(valueOf(idx), cache.get("key-" + idx, 0));
-                    assertNull(cache.get("key-" + idx, 1));
-                } else {
-                    assertNull(cache.get("key-" + idx, 0));
-                }
-                return null;
-            }));
-        }
-
-        for (Future<Void> future : getFutures) {
-            future.get();
-        }
-    }
-
-    @Test
-    public void concurrentUpdateKey() throws ExecutionException, InterruptedException {
-        PriorityCache<String, Byte> cache = new PriorityCache<>(1, 5);
-
-        List<Byte> costs = new ArrayList<>(Byte.MAX_VALUE + 1);
-        for (int k = 0; k <= Byte.MAX_VALUE; k++) {
-            costs.add((byte) k);
-        }
-        Collections.shuffle(costs);
-
-        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-        List<Future<Boolean>> futures = new ArrayList<>(Byte.MAX_VALUE + 1);
-
-        for (int k = 0; k <= Byte.MAX_VALUE; k++) {
-            byte cost = costs.get(k);
-            futures.add(executor.submit(() -> cache.put("key-" + cost, cost, 0, cost)));
-        }
-
-        for (Future<Boolean> future : futures) {
-            future.get();
-        }
-
-        assertEquals( Byte.valueOf(Byte.MAX_VALUE), cache.get("key-" + Byte.MAX_VALUE, 0));
-    }
-
-    @Test
-    public void concurrentUpdateWithNewGeneration() throws ExecutionException, InterruptedException {
-        final int NUM_GENERATIONS = 256;
-        PriorityCache<String, Integer> cache = new PriorityCache<>(1, 5);
-
-        List<Integer> generations = new ArrayList<>(NUM_GENERATIONS);
-        for (int k = 0; k < NUM_GENERATIONS; k++) {
-            generations.add(k);
-        }
-        Collections.shuffle(generations);
-
-        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-        List<Future<Boolean>> futures = new ArrayList<>(NUM_GENERATIONS);
-
-        for (int k = 0; k < NUM_GENERATIONS; k++) {
-            int gen = generations.get(k);
-            futures.add(executor.submit(() -> cache.put("key", gen, gen, (byte) 0)));
-        }
-
-        for (Future<Boolean> future : futures) {
-            future.get();
-        }
-
-        assertEquals(Integer.valueOf(NUM_GENERATIONS - 1), cache.get("key", NUM_GENERATIONS-1));
-    }
-
-    @Test
-    public void concurrentGenerationPurge() throws ExecutionException, InterruptedException {
-        PriorityCache<String, Integer> cache = new PriorityCache<>(65536);
-
-        for (int gen = 4; gen >= 0; gen--) {
-            // Backward iteration avoids earlier generations are replaced with later ones
-            for (int k = 0; k < 100; k++) {
-                assumeTrue("All test keys are in the cache",
-                        cache.put("key-" + gen + "-" + k, 0, gen, (byte) 0));
-            }
-        }
-
-        assertEquals(500, cache.size());
-
-        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-        List<Future<Void>> futures = new ArrayList<>(concurrency * 4);
-
-        for (int i = 0; i < concurrency; i++) {
-            futures.add(executor.submit(() -> {
-                cache.purgeGenerations(generation -> generation == 1);
-                return null;
-            }));
-            futures.add(executor.submit(() -> {
-                cache.purgeGenerations(generation -> generation == 4);
-                return null;
-            }));
-            futures.add(executor.submit(() -> {
-                cache.purgeGenerations(generation -> generation <= 1);
-                return null;
-            }));
-        }
-
-        for (Future<Void> future : futures) {
-            future.get();
-        }
-
-        assertEquals(200, cache.size());
-    }
-
-    @Test
-    public void concurrentEvictionCount() throws ExecutionException, InterruptedException {
-        Random rnd = new Random();
-        PriorityCache<String, Integer> cache = new PriorityCache<>(128, 2, CacheWeights.noopWeigher());
-
-        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-        List<Future<Boolean>> futures = new ArrayList<>(256);
-
-        for (int b = Byte.MIN_VALUE; b <= Byte.MAX_VALUE; b++) {
-            int value = b;
-            futures.add(executor.submit(() ->
-                    cache.put("k-" + value + "-" + rnd.nextInt(1000), value, 0, (byte) value)));
-        }
-
-        int count = 0;
-        for (Future<Boolean> future : futures) {
-            if (future.get()) {
-                count++;
-            }
-        }
-
-        assertEquals(count, cache.size() + cache.getStats().evictionCount());
-    }
-
-    @Test
-    public void concurrentLoadExceptionCount() throws ExecutionException, InterruptedException {
-        Random rnd = new Random();
-        PriorityCache<String, Integer> cache = new PriorityCache<>(16);
-
-        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
-        List<Future<Boolean>> futures = new ArrayList<>(1000);
-
-        for (int i = 0; i < 1000; i++) {
-            int value = i;
-            futures.add(executor.submit(() ->
-                    cache.put("k-" + value + "-" + rnd.nextInt(1000), value, 0, (byte) 0)));
-        }
-
-        int success = 0;
-        int failure = 0;
-
-        for (Future<Boolean> future : futures) {
-            if (future.get()) {
-                success++;
-            } else {
-                failure++;
-            }
-        }
-
-        assertEquals(0, cache.getStats().evictionCount());
-        assertEquals(success, cache.size());
-        assertEquals(failure, cache.getStats().loadExceptionCount());
-    }
-
-}
diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java
index 380b595..3d2be39 100644
--- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java
+++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/PriorityCacheTest.java
@@ -21,17 +21,18 @@
 
 import static java.lang.Integer.valueOf;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
+import java.util.Random;
+
 import org.apache.jackrabbit.guava.common.cache.Weigher;
 import org.apache.jackrabbit.oak.segment.CacheWeights;
 import org.junit.Test;
 
-import java.util.Random;
-import java.util.function.Predicate;
+import org.apache.jackrabbit.guava.common.base.Predicate;
 
 public class PriorityCacheTest {
 
@@ -80,7 +81,7 @@
 
     @Test
     public void readWrite() {
-        PriorityCache<String, Integer> cache = new PriorityCache<>(128, 0);
+        PriorityCache<String, Integer> cache = new PriorityCache<String, Integer>(128, 0);
         for (int k = 0; k < 128; k++) {
             if (cache.put("key-" + k, k, 0, (byte) 0)) {
                 assertEquals(Integer.valueOf(k), cache.get("key-" + k, 0));
@@ -100,7 +101,7 @@
 
     @Test
     public void updateKey() {
-        PriorityCache<String, Integer> cache = new PriorityCache<>(1, 0);
+        PriorityCache<String, Integer> cache = new PriorityCache<String, Integer>(1, 0);
 
         assertTrue(cache.put("one", 1, 0, (byte) 0));
 
@@ -117,7 +118,7 @@
 
     @Test
     public void updateWithNewGeneration() {
-        PriorityCache<String, Integer> cache = new PriorityCache<>(1, 0);
+        PriorityCache<String, Integer> cache = new PriorityCache<String, Integer>(1, 0);
         assertTrue(cache.put("one", 1, 0, (byte) 0));
 
         // Cache is full but we can still put a key of a higher generation
@@ -133,18 +134,24 @@
 
     @Test
     public void generationPurge() {
-        PriorityCache<String, Integer> cache = new PriorityCache<>(65536);
+        PriorityCache<String, Integer> cache = new PriorityCache<String, Integer>(65536);
 
         for (int gen = 4; gen >= 0; gen--) {
             // Backward iteration avoids earlier generations are replaced with later ones
             for (int k = 0; k < 100; k++) {
-                assumeTrue("All test keys are in the cache",
-                        cache.put("key-" + gen + "-" + k, 0, gen, (byte) 0));
+                if (!cache.put("key-" + gen + "-" + k, 0, gen, (byte) 0)) {
+                    assumeTrue("All test keys are in the cache", false);
+                }
             }
         }
 
         assertEquals(500, cache.size());
-        cache.purgeGenerations(generation -> generation <= 2);
+        cache.purgeGenerations(new Predicate<Integer>() {
+            @Override
+            public boolean apply(Integer generation) {
+                return generation <= 2;
+            }
+        });
         assertEquals(200, cache.size());
     }
 
diff --git a/oak-shaded-guava/.DS_Store b/oak-shaded-guava/.DS_Store
deleted file mode 100644
index 91a7914..0000000
--- a/oak-shaded-guava/.DS_Store
+++ /dev/null
Binary files differ
diff --git a/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounter.java b/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounter.java
deleted file mode 100644
index a1006ee..0000000
--- a/oak-store-spi/src/main/java/org/apache/jackrabbit/oak/plugins/index/ApproximateCounter.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.plugins.index;
-
-import java.util.Random;
-import java.util.UUID;
-
-import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
-
-/**
- * An approximate counter algorithm.
- */
-public class ApproximateCounter {
-    
-    public static final String COUNT_PROPERTY_PREFIX = ":count_";
-    public static final int COUNT_RESOLUTION = 100;
-    public static final int COUNT_MAX = 10000000;
-
-    private static final Random RANDOM = new Random();
-
-    private ApproximateCounter() {
-    }
-
-    /**
-     * Calculate the approximate offset from a given offset. The offset is the
-     * number of added or removed entries. The result is 0 in most of the cases,
-     * but sometimes it is a (positive or negative) multiple of the resolution,
-     * such that on average, the sum of the returned value matches the sum of
-     * the passed offsets.
-     * 
-     * @param offset the high-resolution input offset
-     * @param resolution the resolution
-     * @return the low-resolution offset (most of the time 0)
-     */
-    public static long calculateOffset(long offset, int resolution) {
-        if (offset == 0 || resolution <= 1) {
-            return offset;
-        }
-        int add = resolution;
-        if (offset < 0) {
-            offset = -offset;
-            add = -add;
-        }
-        long result = 0;
-        for (long i = 0; i < offset; i++) {
-            if (RANDOM.nextInt(resolution) == 0) {
-                result += add;
-            }
-        }
-        return result;
-    }
-
-    /**
-     * This method ensures that the new approximate count (the old count plus
-     * the calculated offset) does not go below 0.
-     * 
-     * Also, for large counts and resolutions larger than 10, it reduces the
-     * resolution by a factor of 10 (further reducing the number of updates
-     * needed by a factor of 10).
-     * 
-     * @param oldCount the old count
-     * @param calculatedOffset the calculated offset (may not be 0)
-     * @param resolution the new (lower) resolution
-     * @return the new offset
-     */
-    public static long adjustOffset(long oldCount, long calculatedOffset,
-            int resolution) {
-        if (oldCount + calculatedOffset < 0) {
-            return -oldCount;
-        }
-        if (resolution <= 10 || oldCount < resolution * 10) {
-            return calculatedOffset;
-        }
-        return RANDOM.nextInt(10) == 0 ? calculatedOffset * 10 : 0;
-    }
-    
-    /**
-     * Set the seed of the random number generator (used for testing).
-     * 
-     * @param seed the new seed
-     */
-    static void setSeed(int seed) {
-        RANDOM.setSeed(seed);
-    }
-    
-    /**
-     * Adjust a counter in the given node. This method supports concurrent
-     * changes. It uses multiple properties, and is less accurate, but can be
-     * used in a multi-threaded environment, as it uses unique property names.
-     * 
-     * @param builder the node builder
-     * @param offset the offset
-     */
-    public static void adjustCountSync(NodeBuilder builder, long offset) {
-        if (offset == 0) {
-            return;
-        }
-        boolean added = offset > 0;
-        for (long i = 0; i < Math.abs(offset); i++) {
-            adjustCountSync(builder, added);
-        }
-    }
-    
-    private static void adjustCountSync(NodeBuilder builder, boolean added) {
-        if (RANDOM.nextInt(COUNT_RESOLUTION) != 0) {
-            return;
-        }
-        int max = getMaxCount(builder, added);
-        if (max >= COUNT_MAX) {
-            return;
-        }
-        // TODO is this the right approach? divide by count_resolution
-        int x = Math.max(COUNT_RESOLUTION, max * 2) / COUNT_RESOLUTION;
-        if (RANDOM.nextInt(x) > 0) {
-            return;
-        }
-        long value = x * COUNT_RESOLUTION;
-        String propertyName = COUNT_PROPERTY_PREFIX + UUID.randomUUID();
-        builder.setProperty(propertyName, added ? value : -value);
-    }
-    
-    private static int getMaxCount(NodeBuilder node, boolean added) {
-        long max = 0;
-        for (PropertyState p : node.getProperties()) {
-            if (!p.getName().startsWith(COUNT_PROPERTY_PREFIX)) {
-                continue;
-            }
-            long x = p.getValue(Type.LONG);
-            if (added == x > 0) {
-                max = Math.max(max, Math.abs(x));
-            }
-        }
-        max = Math.min(Integer.MAX_VALUE, max);
-        return (int) max;
-    }
-    
-    /**
-     * Get the count estimation.
-     *
-     * @param node the node
-     * @return the estimation (-1 if no estimation is available)
-     */
-    public static long getCountSync(NodeState node) {
-        boolean hasCountProperty = false;
-        long added = 0;
-        long removed = 0;
-        for (PropertyState p : node.getProperties()) {
-            if (!p.getName().startsWith(COUNT_PROPERTY_PREFIX)) {
-                continue;
-            }
-            hasCountProperty = true;
-            long x = p.getValue(Type.LONG);
-            if (x > 0) {
-                added += x;
-            } else {
-                removed -= x;
-            }
-        }
-        if (!hasCountProperty) {
-            return -1;
-        }
-        return Math.max(added / 2, added - removed);
-    }
-
-}