blob: c7417e14691c718f8debc25d73c3ca06bea5c62f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.jackrabbit.oak.segment;
import static org.apache.jackrabbit.guava.common.collect.Maps.newConcurrentMap;
import static org.apache.jackrabbit.guava.common.collect.Sets.intersection;
import static org.apache.jackrabbit.guava.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.guava.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
public class SegmentBufferWriterPoolTest {
private final MemoryStore store = new MemoryStore();
private final RecordId rootId = store.getRevisions().getHead();
private GCGeneration gcGeneration = GCGeneration.NULL;
private final SegmentBufferWriterPool pool;
private final ExecutorService[] executors = new ExecutorService[] {
newSingleThreadExecutor(), newSingleThreadExecutor(), newSingleThreadExecutor()};
public static List<SegmentBufferWriterPool.PoolType> poolTypes() {
return Arrays.asList(SegmentBufferWriterPool.PoolType.values());
public SegmentBufferWriterPoolTest(SegmentBufferWriterPool.PoolType poolType) throws IOException {
pool = SegmentBufferWriterPool.factory(
store.getSegmentIdProvider(), store.getReader(), "", () -> gcGeneration)
public void tearDown() {
for (ExecutorService executor : executors) {
private Future<RecordId> execute(GCGeneration gcGeneration, final WriteOperation op, int executor) {
return executors[executor].submit(() -> pool.execute(gcGeneration, op));
private WriteOperation createOp(final String key, final ConcurrentMap<String, SegmentBufferWriter> map) {
return new WriteOperation() {
@NotNull @Override
public RecordId execute(@NotNull SegmentBufferWriter writer) {
map.put(key, writer);
return rootId;
public void testThreadAffinity() throws IOException, ExecutionException, InterruptedException {
GCGeneration gen = pool.getGCGeneration();
ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
assertEquals(rootId, res1.get());
assertEquals(rootId, res2.get());
assertEquals(rootId, res3.get());
assertEquals(3, map1.size());
ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
assertEquals(rootId, res4.get());
assertEquals(rootId, res5.get());
assertEquals(rootId, res6.get());
assertEquals(3, map2.size());
assertEquals(map1, map2);
public void testFlush() throws ExecutionException, InterruptedException, IOException {
GCGeneration gen = pool.getGCGeneration();
ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
assertEquals(rootId, res1.get());
assertEquals(rootId, res2.get());
assertEquals(rootId, res3.get());
assertEquals(3, map1.size());
ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
assertEquals(rootId, res4.get());
assertEquals(rootId, res5.get());
assertEquals(rootId, res6.get());
assertEquals(3, map2.size());
assertTrue(intersection(newHashSet(map1.values()), newHashSet(map2.values())).isEmpty());
public void testCompaction() throws ExecutionException, InterruptedException, IOException {
GCGeneration gen = pool.getGCGeneration();
ConcurrentMap<String, SegmentBufferWriter> map1 = newConcurrentMap();
Future<RecordId> res1 = execute(gen, createOp("a", map1), 0);
Future<RecordId> res2 = execute(gen, createOp("b", map1), 1);
Future<RecordId> res3 = execute(gen, createOp("c", map1), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
assertEquals(rootId, res1.get());
assertEquals(rootId, res2.get());
assertEquals(rootId, res3.get());
assertEquals(3, map1.size());
// Simulate compaction by increasing the global gc generation
gcGeneration = gcGeneration.nextFull();
// Write using previous generation
ConcurrentMap<String, SegmentBufferWriter> map2 = newConcurrentMap();
Future<RecordId> res4 = execute(gen, createOp("a", map2), 0);
Future<RecordId> res5 = execute(gen, createOp("b", map2), 1);
Future<RecordId> res6 = execute(gen, createOp("c", map2), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
assertEquals(rootId, res4.get());
assertEquals(rootId, res5.get());
assertEquals(rootId, res6.get());
assertEquals(3, map2.size());
assertEquals(map1, map2);
// Write using current generation
ConcurrentMap<String, SegmentBufferWriter> map3 = newConcurrentMap();
Future<RecordId> res7 = execute(gen.nextFull(), createOp("a", map3), 0);
Future<RecordId> res8 = execute(gen.nextFull(), createOp("b", map3), 1);
Future<RecordId> res9 = execute(gen.nextFull(), createOp("c", map3), 2);
// Give the tasks some time to complete
sleepUninterruptibly(10, MILLISECONDS);
assertEquals(rootId, res7.get());
assertEquals(rootId, res8.get());
assertEquals(rootId, res9.get());
assertEquals(3, map3.size());
assertTrue(intersection(newHashSet(map1.values()), newHashSet(map3.values())).isEmpty());
public void testFlushBlocks() throws ExecutionException, InterruptedException {
GCGeneration gcGeneration = pool.getGCGeneration();
Future<RecordId> res = execute(gcGeneration, writer -> {
try {
// This should deadlock as flush waits for this write
// operation to finish, which in this case contains the
// call to flush itself.
executors[1].submit(() -> {
return null;
}).get(100, MILLISECONDS);
return null; // No deadlock -> null indicates test failure
} catch (InterruptedException | ExecutionException ignore) {
return null; // No deadlock -> null indicates test failure
} catch (TimeoutException ignore) {
return rootId; // Deadlock -> rootId indicates test pass
}, 0);
assertEquals(rootId, res.get());