blob: cab79e0edf8e93cf68eb80282945b4c0dbd48557 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.segment.file;
import org.apache.jackrabbit.oak.segment.CacheWeights;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static java.lang.Integer.valueOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assume.assumeTrue;
@RunWith(Parameterized.class)
public class ConcurrentPriorityCacheTest {
private final int concurrency;
@Parameterized.Parameters
public static List<Integer> concurrencyLevels() {
return Arrays.asList(1, 2, 4, 8, 16, 32);
}
public ConcurrentPriorityCacheTest(int concurrency) {
this.concurrency = concurrency;
}
@Test
public void concurrentReadWrite() throws ExecutionException, InterruptedException {
final int SIZE = 16384;
PriorityCache<String, Integer> cache = new PriorityCache<>(SIZE, 10);
ExecutorService executor1 = Executors.newFixedThreadPool(concurrency);
ExecutorService executor2 = Executors.newFixedThreadPool(concurrency);
List<Future<Boolean>> putFutures = new ArrayList<>(SIZE);
List<Future<Void>> getFutures = new ArrayList<>(SIZE);
for (int k = 0; k < SIZE; k++) {
int idx = k;
putFutures.add(executor1.submit(
() -> cache.put("key-" + idx, idx, 0, (byte) 0)));
}
for (int k = 0; k < SIZE; k++) {
int idx = k;
getFutures.add(executor2.submit(() -> {
if (putFutures.get(idx).get()) {
assertEquals(valueOf(idx), cache.get("key-" + idx, 0));
assertNull(cache.get("key-" + idx, 1));
} else {
assertNull(cache.get("key-" + idx, 0));
}
return null;
}));
}
for (Future<Void> future : getFutures) {
future.get();
}
}
@Test
public void concurrentUpdateKey() throws ExecutionException, InterruptedException {
PriorityCache<String, Byte> cache = new PriorityCache<>(1, 5);
List<Byte> costs = new ArrayList<>(Byte.MAX_VALUE + 1);
for (int k = 0; k <= Byte.MAX_VALUE; k++) {
costs.add((byte) k);
}
Collections.shuffle(costs);
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<Future<Boolean>> futures = new ArrayList<>(Byte.MAX_VALUE + 1);
for (int k = 0; k <= Byte.MAX_VALUE; k++) {
byte cost = costs.get(k);
futures.add(executor.submit(() -> cache.put("key-" + cost, cost, 0, cost)));
}
for (Future<Boolean> future : futures) {
future.get();
}
assertEquals( Byte.valueOf(Byte.MAX_VALUE), cache.get("key-" + Byte.MAX_VALUE, 0));
}
@Test
public void concurrentUpdateWithNewGeneration() throws ExecutionException, InterruptedException {
final int NUM_GENERATIONS = 256;
PriorityCache<String, Integer> cache = new PriorityCache<>(1, 5);
List<Integer> generations = new ArrayList<>(NUM_GENERATIONS);
for (int k = 0; k < NUM_GENERATIONS; k++) {
generations.add(k);
}
Collections.shuffle(generations);
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<Future<Boolean>> futures = new ArrayList<>(NUM_GENERATIONS);
for (int k = 0; k < NUM_GENERATIONS; k++) {
int gen = generations.get(k);
futures.add(executor.submit(() -> cache.put("key", gen, gen, (byte) 0)));
}
for (Future<Boolean> future : futures) {
future.get();
}
assertEquals(Integer.valueOf(NUM_GENERATIONS - 1), cache.get("key", NUM_GENERATIONS-1));
}
@Test
public void concurrentGenerationPurge() throws ExecutionException, InterruptedException {
PriorityCache<String, Integer> cache = new PriorityCache<>(65536);
for (int gen = 4; gen >= 0; gen--) {
// Backward iteration avoids earlier generations are replaced with later ones
for (int k = 0; k < 100; k++) {
assumeTrue("All test keys are in the cache",
cache.put("key-" + gen + "-" + k, 0, gen, (byte) 0));
}
}
assertEquals(500, cache.size());
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<Future<Void>> futures = new ArrayList<>(concurrency * 4);
for (int i = 0; i < concurrency; i++) {
futures.add(executor.submit(() -> {
cache.purgeGenerations(generation -> generation == 1);
return null;
}));
futures.add(executor.submit(() -> {
cache.purgeGenerations(generation -> generation == 4);
return null;
}));
futures.add(executor.submit(() -> {
cache.purgeGenerations(generation -> generation <= 1);
return null;
}));
}
for (Future<Void> future : futures) {
future.get();
}
assertEquals(200, cache.size());
}
@Test
public void concurrentEvictionCount() throws ExecutionException, InterruptedException {
Random rnd = new Random();
PriorityCache<String, Integer> cache = new PriorityCache<>(128, 2, CacheWeights.noopWeigher());
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<Future<Boolean>> futures = new ArrayList<>(256);
for (int b = Byte.MIN_VALUE; b <= Byte.MAX_VALUE; b++) {
int value = b;
futures.add(executor.submit(() ->
cache.put("k-" + value + "-" + rnd.nextInt(1000), value, 0, (byte) value)));
}
int count = 0;
for (Future<Boolean> future : futures) {
if (future.get()) {
count++;
}
}
assertEquals(count, cache.size() + cache.getStats().evictionCount());
}
@Test
public void concurrentLoadExceptionCount() throws ExecutionException, InterruptedException {
Random rnd = new Random();
PriorityCache<String, Integer> cache = new PriorityCache<>(16);
ExecutorService executor = Executors.newFixedThreadPool(concurrency);
List<Future<Boolean>> futures = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++) {
int value = i;
futures.add(executor.submit(() ->
cache.put("k-" + value + "-" + rnd.nextInt(1000), value, 0, (byte) 0)));
}
int success = 0;
int failure = 0;
for (Future<Boolean> future : futures) {
if (future.get()) {
success++;
} else {
failure++;
}
}
assertEquals(0, cache.getStats().evictionCount());
assertEquals(success, cache.size());
assertEquals(failure, cache.getStats().loadExceptionCount());
}
}