blob: c0520157d945faf5e0953cdb59216c216983b60a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.utils;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import org.junit.Assert;
import org.junit.Test;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.utils.btree.*;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.reverseOrder;
import static org.apache.cassandra.utils.btree.BTree.iterable;
import static org.junit.Assert.assertTrue;
public class LongBTreeTest
{
private static final boolean DEBUG = false;
private static int perThreadTrees = 100;
private static int minTreeSize = 4;
private static int maxTreeSize = 10000;
private static int threads = DEBUG ? 1 : Runtime.getRuntime().availableProcessors() * 8;
private static final MetricRegistry metrics = new MetricRegistry();
private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
private static final ExecutorService MODIFY = Executors.newFixedThreadPool(threads, new NamedThreadFactory("MODIFY"));
private static final ExecutorService COMPARE = DEBUG ? MODIFY : Executors.newFixedThreadPool(threads, new NamedThreadFactory("COMPARE"));
private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
static
{
System.setProperty("cassandra.btree.fanfactor", "4");
}
/************************** TEST ACCESS ********************************************/
@Test
public void testSearchIterator() throws InterruptedException
{
final int perTreeSelections = 100;
testRandomSelection(perThreadTrees, perTreeSelections,
(test) -> {
IndexedSearchIterator<Integer, Integer> iter1 = test.testAsSet.iterator();
IndexedSearchIterator<Integer, Integer> iter2 = test.testAsList.iterator();
return (key) ->
{
Integer found1 = iter1.hasNext() ? iter1.next(key) : null;
Integer found2 = iter2.hasNext() ? iter2.next(key) : null;
Assert.assertSame(found1, found2);
if (found1 != null)
Assert.assertEquals(iter1.indexOfCurrent(), iter2.indexOfCurrent());
int index = Collections.binarySearch(test.canonicalList, key, test.comparator);
if (index < 0)
{
Assert.assertNull(found1);
}
else
{
Assert.assertEquals(key, found1);
Assert.assertEquals(index, iter1.indexOfCurrent());
}
// check that by advancing the same key again we get null, but only do it on one of the two iterators
// to ensure they both advance differently
if (ThreadLocalRandom.current().nextBoolean())
Assert.assertNull(iter1.next(key));
else
Assert.assertNull(iter2.next(key));
};
});
}
@Test
public void testInequalityLookups() throws InterruptedException
{
final int perTreeSelections = 2;
testRandomSelectionOfSet(perThreadTrees, perTreeSelections,
(test, canonical) -> {
if (!canonical.isEmpty() || !test.isEmpty())
{
Assert.assertEquals(canonical.isEmpty(), test.isEmpty());
Assert.assertEquals(canonical.first(), test.first());
Assert.assertEquals(canonical.last(), test.last());
}
return (key) ->
{
Assert.assertEquals(test.ceiling(key), canonical.ceiling(key));
Assert.assertEquals(test.higher(key), canonical.higher(key));
Assert.assertEquals(test.floor(key), canonical.floor(key));
Assert.assertEquals(test.lower(key), canonical.lower(key));
};
});
}
@Test
public void testListIndexes() throws InterruptedException
{
testRandomSelectionOfList(perThreadTrees, 4,
(test, canonical, cmp) ->
(key) ->
{
int javaIndex = Collections.binarySearch(canonical, key, cmp);
int btreeIndex = test.indexOf(key);
Assert.assertEquals(javaIndex, btreeIndex);
if (javaIndex >= 0)
Assert.assertEquals(canonical.get(javaIndex), test.get(btreeIndex));
}
);
}
@Test
public void testToArray() throws InterruptedException
{
testRandomSelection(perThreadTrees, 4,
(selection) ->
{
Integer[] array = new Integer[selection.canonicalList.size() + 1];
selection.testAsList.toArray(array, 1);
Assert.assertEquals(null, array[0]);
for (int j = 0; j < selection.canonicalList.size(); j++)
Assert.assertEquals(selection.canonicalList.get(j), array[j + 1]);
});
}
private static final class CountingFunction implements Function<Integer, Integer>
{
final Function<Integer, Integer> wrapped;
int count = 0;
protected CountingFunction(Function<Integer, Integer> wrapped)
{
this.wrapped = wrapped;
}
public Integer apply(Integer integer)
{
count++;
return wrapped.apply(integer);
}
}
@Test
public void testTransformAndFilter() throws InterruptedException
{
testRandomSelection(perThreadTrees, 4, false, false, false,
(selection) ->
{
Map<Integer, Integer> update = new LinkedHashMap<>();
for (Integer i : selection.testKeys)
update.put(i, new Integer(i));
CountingFunction function;
Object[] original = selection.testAsSet.tree();
Object[] transformed;
// test replacing none, leaving all present
function = new CountingFunction((x) -> x);
transformed = BTree.transformAndFilter(original, function);
Assert.assertEquals(BTree.size(original), function.count);
Assert.assertSame(original, transformed);
// test replacing some, leaving all present
function = new CountingFunction((x) -> update.containsKey(x) ? update.get(x) : x);
transformed = BTree.transformAndFilter(original, function);
Assert.assertEquals(BTree.size(original), function.count);
assertSame(transform(selection.canonicalList, function.wrapped), iterable(transformed));
// test replacing some, removing some
function = new CountingFunction(update::get);
transformed = BTree.transformAndFilter(original, function);
Assert.assertEquals(BTree.size(original), function.count);
assertSame(filter(transform(selection.canonicalList, function.wrapped), notNull()), iterable(transformed));
// test replacing none, removing some
function = new CountingFunction((x) -> update.containsKey(x) ? null : x);
transformed = BTree.transformAndFilter(selection.testAsList.tree(), function);
Assert.assertEquals(BTree.size(original), function.count);
assertSame(filter(transform(selection.canonicalList, function.wrapped), notNull()), iterable(transformed));
});
}
private static void assertSame(Iterable<Integer> i1, Iterable<Integer> i2)
{
assertSame(i1.iterator(), i2.iterator());
}
private static void assertSame(Iterator<Integer> i1, Iterator<Integer> i2)
{
while (i1.hasNext() && i2.hasNext())
Assert.assertSame(i1.next(), i2.next());
Assert.assertEquals(i1.hasNext(), i2.hasNext());
}
private void testRandomSelectionOfList(int perThreadTrees, int perTreeSelections, BTreeListTestFactory testRun) throws InterruptedException
{
testRandomSelection(perThreadTrees, perTreeSelections,
(BTreeTestFactory) (selection) -> testRun.get(selection.testAsList, selection.canonicalList, selection.comparator));
}
private void testRandomSelectionOfSet(int perThreadTrees, int perTreeSelections, BTreeSetTestFactory testRun) throws InterruptedException
{
testRandomSelection(perThreadTrees, perTreeSelections,
(BTreeTestFactory) (selection) -> testRun.get(selection.testAsSet, selection.canonicalSet));
}
static interface BTreeSetTestFactory
{
TestEachKey get(BTreeSet<Integer> test, NavigableSet<Integer> canonical);
}
static interface BTreeListTestFactory
{
TestEachKey get(BTreeSet<Integer> test, List<Integer> canonical, Comparator<Integer> comparator);
}
static interface BTreeTestFactory
{
TestEachKey get(RandomSelection test);
}
static interface TestEachKey
{
void testOne(Integer value);
}
private void testRandomSelection(int perThreadTrees, int perTreeSelections, BTreeTestFactory testRun) throws InterruptedException
{
testRandomSelection(perThreadTrees, perTreeSelections, (selection) -> {
TestEachKey testEachKey = testRun.get(selection);
for (Integer key : selection.testKeys)
testEachKey.testOne(key);
});
}
private void testRandomSelection(int perThreadTrees, int perTreeSelections, Consumer<RandomSelection> testRun) throws InterruptedException
{
testRandomSelection(perThreadTrees, perTreeSelections, true, true, true, testRun);
}
private void testRandomSelection(int perThreadTrees, int perTreeSelections, boolean narrow, boolean mixInNotPresentItems, boolean permitReversal, Consumer<RandomSelection> testRun) throws InterruptedException
{
int threads = Runtime.getRuntime().availableProcessors();
final CountDownLatch latch = new CountDownLatch(threads);
final AtomicLong errors = new AtomicLong();
final AtomicLong count = new AtomicLong();
final long totalCount = threads * perThreadTrees * perTreeSelections;
for (int t = 0 ; t < threads ; t++)
{
Runnable runnable = new Runnable()
{
public void run()
{
try
{
for (int i = 0 ; i < perThreadTrees ; i++)
{
RandomTree tree = randomTree(minTreeSize, maxTreeSize);
for (int j = 0 ; j < perTreeSelections ; j++)
{
testRun.accept(tree.select(narrow, mixInNotPresentItems, permitReversal));
count.incrementAndGet();
}
}
}
catch (Throwable t)
{
errors.incrementAndGet();
t.printStackTrace();
}
latch.countDown();
}
};
MODIFY.execute(runnable);
}
while (latch.getCount() > 0)
{
for (int i = 0 ; i < 10L ; i++)
{
latch.await(1L, TimeUnit.SECONDS);
Assert.assertEquals(0, errors.get());
}
log("%.1f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : "");
}
}
private static class RandomSelection
{
final List<Integer> testKeys;
final NavigableSet<Integer> canonicalSet;
final List<Integer> canonicalList;
final BTreeSet<Integer> testAsSet;
final BTreeSet<Integer> testAsList;
final Comparator<Integer> comparator;
private RandomSelection(List<Integer> testKeys, NavigableSet<Integer> canonicalSet, BTreeSet<Integer> testAsSet,
List<Integer> canonicalList, BTreeSet<Integer> testAsList, Comparator<Integer> comparator)
{
this.testKeys = testKeys;
this.canonicalList = canonicalList;
this.canonicalSet = canonicalSet;
this.testAsSet = testAsSet;
this.testAsList = testAsList;
this.comparator = comparator;
}
}
private static class RandomTree
{
final NavigableSet<Integer> canonical;
final BTreeSet<Integer> test;
private RandomTree(NavigableSet<Integer> canonical, BTreeSet<Integer> test)
{
this.canonical = canonical;
this.test = test;
}
RandomSelection select(boolean narrow, boolean mixInNotPresentItems, boolean permitReversal)
{
ThreadLocalRandom random = ThreadLocalRandom.current();
NavigableSet<Integer> canonicalSet = this.canonical;
BTreeSet<Integer> testAsSet = this.test;
List<Integer> canonicalList = new ArrayList<>(canonicalSet);
BTreeSet<Integer> testAsList = this.test;
Assert.assertEquals(canonicalSet.size(), testAsSet.size());
Assert.assertEquals(canonicalList.size(), testAsList.size());
// sometimes select keys first, so we cover full range
List<Integer> allKeys = randomKeys(canonical, mixInNotPresentItems);
List<Integer> keys = allKeys;
int narrowCount = random.nextInt(3);
while (narrow && canonicalList.size() > 10 && keys.size() > 10 && narrowCount-- > 0)
{
boolean useLb = random.nextBoolean();
boolean useUb = random.nextBoolean();
if (!(useLb | useUb))
continue;
// select a range smaller than the total span when we have more narrowing iterations left
int indexRange = keys.size() / (narrowCount + 1);
boolean lbInclusive = true;
Integer lbKey = canonicalList.get(0);
int lbKeyIndex = 0, lbIndex = 0;
boolean ubInclusive = true;
Integer ubKey = canonicalList.get(canonicalList.size() - 1);
int ubKeyIndex = keys.size(), ubIndex = canonicalList.size();
if (useLb)
{
lbKeyIndex = random.nextInt(0, indexRange - 1);
Integer candidate = keys.get(lbKeyIndex);
if (useLb = (candidate > lbKey && candidate <= ubKey))
{
lbInclusive = random.nextBoolean();
lbKey = keys.get(lbKeyIndex);
lbIndex = Collections.binarySearch(canonicalList, lbKey);
if (lbIndex >= 0 && !lbInclusive) lbIndex++;
else if (lbIndex < 0) lbIndex = -1 -lbIndex;
}
}
if (useUb)
{
ubKeyIndex = random.nextInt(Math.max(lbKeyIndex, keys.size() - indexRange), keys.size() - 1);
Integer candidate = keys.get(ubKeyIndex);
if (useUb = (candidate < ubKey && candidate >= lbKey))
{
ubInclusive = random.nextBoolean();
ubKey = keys.get(ubKeyIndex);
ubIndex = Collections.binarySearch(canonicalList, ubKey);
if (ubIndex >= 0 && ubInclusive) { ubIndex++; }
else if (ubIndex < 0) ubIndex = -1 -ubIndex;
}
}
if (ubIndex < lbIndex) { ubIndex = lbIndex; ubKey = lbKey; ubInclusive = false; }
canonicalSet = !useLb ? canonicalSet.headSet(ubKey, ubInclusive)
: !useUb ? canonicalSet.tailSet(lbKey, lbInclusive)
: canonicalSet.subSet(lbKey, lbInclusive, ubKey, ubInclusive);
testAsSet = !useLb ? testAsSet.headSet(ubKey, ubInclusive)
: !useUb ? testAsSet.tailSet(lbKey, lbInclusive)
: testAsSet.subSet(lbKey, lbInclusive, ubKey, ubInclusive);
keys = keys.subList(lbKeyIndex, ubKeyIndex);
canonicalList = canonicalList.subList(lbIndex, ubIndex);
testAsList = testAsList.subList(lbIndex, ubIndex);
Assert.assertEquals(canonicalSet.size(), testAsSet.size());
Assert.assertEquals(canonicalList.size(), testAsList.size());
}
// possibly restore full set of keys, to test case where we are provided existing keys that are out of bounds
if (keys != allKeys && random.nextBoolean())
keys = allKeys;
Comparator<Integer> comparator = naturalOrder();
if (permitReversal && random.nextBoolean())
{
if (allKeys != keys)
keys = new ArrayList<>(keys);
if (canonicalSet != canonical)
canonicalList = new ArrayList<>(canonicalList);
Collections.reverse(keys);
Collections.reverse(canonicalList);
testAsList = testAsList.descendingSet();
canonicalSet = canonicalSet.descendingSet();
testAsSet = testAsSet.descendingSet();
comparator = reverseOrder();
}
Assert.assertEquals(canonicalSet.size(), testAsSet.size());
Assert.assertEquals(canonicalList.size(), testAsList.size());
if (!canonicalSet.isEmpty())
{
Assert.assertEquals(canonicalSet.first(), canonicalList.get(0));
Assert.assertEquals(canonicalSet.last(), canonicalList.get(canonicalList.size() - 1));
Assert.assertEquals(canonicalSet.first(), testAsSet.first());
Assert.assertEquals(canonicalSet.last(), testAsSet.last());
Assert.assertEquals(canonicalSet.first(), testAsList.get(0));
Assert.assertEquals(canonicalSet.last(), testAsList.get(testAsList.size() - 1));
}
return new RandomSelection(keys, canonicalSet, testAsSet, canonicalList, testAsList, comparator);
}
}
private static RandomTree randomTree(int minSize, int maxSize)
{
// perform most of our tree constructions via update, as this is more efficient; since every run uses this
// we test builder disproportionately more often than if it had its own test anyway
return ThreadLocalRandom.current().nextFloat() < 0.95 ? randomTreeByUpdate(minSize, maxSize)
: randomTreeByBuilder(minSize, maxSize);
}
private static RandomTree randomTreeByUpdate(int minSize, int maxSize)
{
assert minSize > 3;
TreeSet<Integer> canonical = new TreeSet<>();
ThreadLocalRandom random = ThreadLocalRandom.current();
int targetSize = random.nextInt(minSize, maxSize);
int maxModificationSize = random.nextInt(2, targetSize);
Object[] accmumulate = BTree.empty();
int curSize = 0;
while (curSize < targetSize)
{
int nextSize = maxModificationSize == 1 ? 1 : random.nextInt(1, maxModificationSize);
TreeSet<Integer> build = new TreeSet<>();
for (int i = 0 ; i < nextSize ; i++)
{
Integer next = random.nextInt();
build.add(next);
canonical.add(next);
}
accmumulate = BTree.update(accmumulate, naturalOrder(), build, UpdateFunction.<Integer>noOp());
curSize += nextSize;
maxModificationSize = Math.min(maxModificationSize, targetSize - curSize);
}
return new RandomTree(canonical, BTreeSet.<Integer>wrap(accmumulate, naturalOrder()));
}
private static RandomTree randomTreeByBuilder(int minSize, int maxSize)
{
assert minSize > 3;
ThreadLocalRandom random = ThreadLocalRandom.current();
BTree.Builder<Integer> builder = BTree.builder(naturalOrder());
int targetSize = random.nextInt(minSize, maxSize);
int maxModificationSize = (int) Math.sqrt(targetSize);
TreeSet<Integer> canonical = new TreeSet<>();
int curSize = 0;
TreeSet<Integer> ordered = new TreeSet<>();
List<Integer> shuffled = new ArrayList<>();
while (curSize < targetSize)
{
int nextSize = maxModificationSize <= 1 ? 1 : random.nextInt(1, maxModificationSize);
// leave a random selection of previous values
(random.nextBoolean() ? ordered.headSet(random.nextInt()) : ordered.tailSet(random.nextInt())).clear();
shuffled = new ArrayList<>(shuffled.subList(0, shuffled.size() < 2 ? 0 : random.nextInt(shuffled.size() / 2)));
for (int i = 0 ; i < nextSize ; i++)
{
Integer next = random.nextInt();
ordered.add(next);
shuffled.add(next);
canonical.add(next);
}
switch (random.nextInt(5))
{
case 0:
builder.addAll(ordered);
break;
case 1:
builder.addAll(BTreeSet.of(ordered));
break;
case 2:
for (Integer i : ordered)
builder.add(i);
case 3:
builder.addAll(shuffled);
break;
case 4:
for (Integer i : shuffled)
builder.add(i);
}
curSize += nextSize;
maxModificationSize = Math.min(maxModificationSize, targetSize - curSize);
}
BTreeSet<Integer> btree = BTreeSet.<Integer>wrap(builder.build(), naturalOrder());
Assert.assertEquals(canonical.size(), btree.size());
return new RandomTree(canonical, btree);
}
// select a random subset of the keys, with an optional random population of keys inbetween those that are present
// return a value with the search position
private static List<Integer> randomKeys(Iterable<Integer> canonical, boolean mixInNotPresentItems)
{
ThreadLocalRandom rnd = ThreadLocalRandom.current();
boolean useFake = mixInNotPresentItems && rnd.nextBoolean();
final float fakeRatio = rnd.nextFloat();
List<Integer> results = new ArrayList<>();
Long fakeLb = (long) Integer.MIN_VALUE, fakeUb = null;
Integer max = null;
for (Integer v : canonical)
{
if ( !useFake
|| (fakeUb == null ? v - 1 : fakeUb) <= fakeLb + 1
|| rnd.nextFloat() < fakeRatio)
{
// if we cannot safely construct a fake value, or our randomizer says not to, we emit the next real value
results.add(v);
fakeLb = v.longValue();
fakeUb = null;
}
else
{
// otherwise we emit a fake value in the range immediately proceeding the last real value, and not
// exceeding the real value that would have proceeded (ignoring any other suppressed real values since)
if (fakeUb == null)
fakeUb = v.longValue() - 1;
long mid = (fakeLb + fakeUb) / 2;
assert mid < fakeUb;
results.add((int) mid);
fakeLb = mid;
}
max = v;
}
if (useFake && max != null && max < Integer.MAX_VALUE)
results.add(max + 1);
final float useChance = rnd.nextFloat();
return Lists.newArrayList(filter(results, (x) -> rnd.nextFloat() < useChance));
}
/************************** TEST MUTATION ********************************************/
@Test
public void testOversizedMiddleInsert()
{
TreeSet<Integer> canon = new TreeSet<>();
for (int i = 0 ; i < 10000000 ; i++)
canon.add(i);
Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), UpdateFunction.noOp());
btree = BTree.update(btree, naturalOrder(), canon, UpdateFunction.<Integer>noOp());
canon.add(Integer.MIN_VALUE);
canon.add(Integer.MAX_VALUE);
assertTrue(BTree.isWellFormed(btree, naturalOrder()));
testEqual("Oversize", BTree.iterator(btree), canon.iterator());
}
@Test
public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
{
testInsertions(50, 1, 1, true);
}
@Test
public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
{
testInsertions(50, 1, 5, true);
}
@Test
public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
{
testInsertions(perThreadTrees / 10, 500, 10, 1, true);
}
@Test
public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
{
testInsertions(500, 10, 10, true);
}
@Test
public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
{
testInsertions(perThreadTrees / 10, Math.max(maxTreeSize, 5000), 3, 100, true);
}
@Test
public void testRandomRangeAndBatches() throws ExecutionException, InterruptedException
{
ThreadLocalRandom random = ThreadLocalRandom.current();
int treeSize = random.nextInt(maxTreeSize / 10, maxTreeSize * 10);
for (int i = 0 ; i < perThreadTrees / 10 ; i++)
testInsertions(threads * 10, treeSize, random.nextInt(1, 100) / 10f, treeSize / 100, true);
}
@Test
public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
{
testInsertions(50, 10, 10, false);
}
private static void testInsertions(int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
{
int tests = perThreadTrees * threads;
testInsertions(tests, perTestCount, testKeyRatio, modificationBatchSize, quickEquality);
}
private static void testInsertions(int tests, int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
{
int batchesPerTest = perTestCount / modificationBatchSize;
int testKeyRange = (int) (perTestCount * testKeyRatio);
long totalCount = (long) perTestCount * tests;
log("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
tests, perTestCount, 1 / testKeyRatio, modificationBatchSize);
// if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
{
final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
for (int i = 0 ; i < chunkSize ; i++)
{
int maxRunLength = modificationBatchSize == 1 ? 1 : ThreadLocalRandom.current().nextInt(1, modificationBatchSize);
outer.add(doOneTestInsertions(testKeyRange, maxRunLength, modificationBatchSize, batchesPerTest, quickEquality));
}
final List<ListenableFuture<?>> inner = new ArrayList<>();
long complete = 0;
int reportInterval = Math.max(1000, (int) (totalCount / 10000));
long lastReportAt = 0;
for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
{
inner.addAll(f.get());
complete += perTestCount;
if (complete - lastReportAt >= reportInterval)
{
long done = (chunk * perTestCount) + complete;
float ratio = done / (float) totalCount;
log("Completed %.1f%% (%d of %d operations)", ratio * 100, done, totalCount);
lastReportAt = complete;
}
}
Futures.allAsList(inner).get();
}
Snapshot snap = BTREE_TIMER.getSnapshot();
log("btree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile());
snap = TREE_TIMER.getSnapshot();
log("java: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile());
log("Done");
}
private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
{
ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
{
@Override
public List<ListenableFuture<?>> call()
{
final List<ListenableFuture<?>> r = new ArrayList<>();
NavigableMap<Integer, Integer> canon = new TreeMap<>();
Object[] btree = BTree.empty();
final TreeMap<Integer, Integer> buffer = new TreeMap<>();
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0 ; i < iterations ; i++)
{
buffer.clear();
int mods = rnd.nextInt(1, averageModsPerIteration * 2);
while (mods > 0)
{
int v = rnd.nextInt(upperBound);
int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
for (int j = 0 ; j < c ; j++)
{
buffer.put(v, v);
v++;
}
mods -= c;
}
Timer.Context ctxt;
ctxt = TREE_TIMER.time();
canon.putAll(buffer);
ctxt.stop();
ctxt = BTREE_TIMER.time();
Object[] next = null;
while (next == null)
next = BTree.update(btree, naturalOrder(), buffer.keySet(), SPORADIC_ABORT);
btree = next;
ctxt.stop();
if (!BTree.isWellFormed(btree, naturalOrder()))
{
log("ERROR: Not well formed");
throw new AssertionError("Not well formed!");
}
if (quickEquality)
testEqual("", BTree.iterator(btree), canon.keySet().iterator());
else
r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
}
return r;
}
});
if (DEBUG)
f.run();
else
MODIFY.execute(f);
return f;
}
@Test
public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
{
Object[] cur = BTree.empty();
TreeSet<Integer> canon = new TreeSet<>();
// we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
for (int i = 0 ; i < 128 ; i++)
{
String id = String.format("[0..%d)", canon.size());
log("Testing " + id);
Futures.allAsList(testAllSlices(id, cur, canon)).get();
Object[] next = null;
while (next == null)
next = BTree.update(cur, naturalOrder(), Arrays.asList(i), SPORADIC_ABORT);
cur = next;
canon.add(i);
}
}
private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
{
List<ListenableFuture<?>> waitFor = new ArrayList<>();
testAllSlices(id + " ASC", new BTreeSet<>(btree, naturalOrder()), canon, true, waitFor);
testAllSlices(id + " DSC", new BTreeSet<Integer>(btree, naturalOrder()).descendingSet(), canon.descendingSet(), false, waitFor);
return waitFor;
}
private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
{
testOneSlice(id, btree, canon, results);
for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
{
// test head/tail sets
testOneSlice(String.format("%s->[..%d)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
testOneSlice(String.format("%s->(..%d)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
testOneSlice(String.format("%s->(%d..]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
testOneSlice(String.format("%s->(%d..]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
for (Integer ub : range(canon.size(), lb, ascending))
{
// test subsets
testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
}
}
}
private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
{
ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
{
@Override
public void run()
{
test(id + " Count", test.size(), canon.size());
testEqual(id, test.iterator(), canon.iterator());
testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
}
}, null);
results.add(f);
if (DEBUG)
f.run();
else
COMPARE.execute(f);
}
private static void test(String id, int test, int expect)
{
if (test != expect)
{
log("%s: Expected %d, Got %d", id, expect, test);
}
}
private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
{
boolean equal = true;
while (btree.hasNext() && canon.hasNext())
{
Object i = btree.next();
Object j = canon.next();
if (!Objects.equals(i, j))
{
log("%s: Expected %d, Got %d", id, j, i);
equal = false;
}
}
while (btree.hasNext())
{
log("%s: Expected <Nil>, Got %d", id, btree.next());
equal = false;
}
while (canon.hasNext())
{
log("%s: Expected %d, Got Nil", id, canon.next());
equal = false;
}
if (!equal)
throw new AssertionError("Not equal");
}
// should only be called on sets that range from 0->N or N->0
private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
{
return new Iterable<Integer>()
{
int cur;
int delta;
int end;
{
if (ascending)
{
end = size + 1;
cur = from == Integer.MIN_VALUE ? -1 : from;
delta = 1;
}
else
{
end = -2;
cur = from == Integer.MIN_VALUE ? size : from;
delta = -1;
}
}
@Override
public Iterator<Integer> iterator()
{
return new Iterator<Integer>()
{
@Override
public boolean hasNext()
{
return cur != end;
}
@Override
public Integer next()
{
Integer r = cur;
cur += delta;
return r;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
};
}
private static final class RandomAbort<V> implements UpdateFunction<V, V>
{
final Random rnd;
final float chance;
private RandomAbort(Random rnd, float chance)
{
this.rnd = rnd;
this.chance = chance;
}
public V apply(V replacing, V update)
{
return update;
}
public boolean abortEarly()
{
return rnd.nextFloat() < chance;
}
public void allocated(long heapSize)
{
}
public V apply(V v)
{
return v;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException, InvocationTargetException, IllegalAccessException
{
for (String arg : args)
{
if (arg.startsWith("fan="))
System.setProperty("cassandra.btree.fanfactor", arg.substring(4));
else if (arg.startsWith("min="))
minTreeSize = Integer.parseInt(arg.substring(4));
else if (arg.startsWith("max="))
maxTreeSize = Integer.parseInt(arg.substring(4));
else if (arg.startsWith("count="))
perThreadTrees = Integer.parseInt(arg.substring(6));
else
exit();
}
List<Method> methods = new ArrayList<>();
for (Method m : LongBTreeTest.class.getDeclaredMethods())
{
if (m.getParameters().length > 0)
continue;
for (Annotation annotation : m.getAnnotations())
if (annotation.annotationType() == Test.class)
methods.add(m);
}
LongBTreeTest test = new LongBTreeTest();
Collections.sort(methods, (a, b) -> a.getName().compareTo(b.getName()));
log(Lists.transform(methods, (m) -> m.getName()).toString());
for (Method m : methods)
{
log(m.getName());
m.invoke(test);
}
log("success");
}
private static void exit()
{
log("usage: fan=<int> min=<int> max=<int> count=<int>");
log("fan: btree fanout");
log("min: minimum btree size (must be >= 4)");
log("max: maximum btree size (must be >= 4)");
log("count: number of trees to assign each core, for each test");
}
private static void log(String formatstr, Object ... args)
{
args = Arrays.copyOf(args, args.length + 1);
System.arraycopy(args, 0, args, 1, args.length - 1);
args[0] = System.currentTimeMillis();
System.out.printf("%tT: " + formatstr + "\n", args);
}
}