blob: 34ec29ac5952bc27e975376b5833f92c871f6e4e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.test.microbench.btree;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntToLongFunction;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionInfo;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.MutableDeletionInfo;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.partitions.AbstractBTreePartition;
import org.apache.cassandra.db.partitions.AtomicBTreePartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.BulkIterator;
import org.apache.cassandra.utils.memory.ByteBufferCloner;
import org.apache.cassandra.utils.memory.Cloner;
import org.apache.cassandra.utils.memory.HeapPool;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import static java.lang.Long.min;
import static java.lang.Math.max;
@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
public class AtomicBTreePartitionUpdateBench
private static final OpOrder NO_ORDER = new OpOrder();
private static final MutableDeletionInfo NO_DELETION_INFO = new MutableDeletionInfo(DeletionTime.LIVE);
private static final HeapPool POOL = new HeapPool(Long.MAX_VALUE, 1.0f, () -> ImmediateFuture.success(Boolean.TRUE));
private static final ByteBuffer zero = Int32Type.instance.decompose(0);
private static final DecoratedKey decoratedKey = new BufferDecoratedKey(new ByteOrderedPartitioner().getToken(zero), zero);
public enum Distribution { RANDOM, SEQUENTIAL }
final AtomicInteger uniqueThreadInitialisation = new AtomicInteger();
int clusteringCount;
// a value of -1 indicates to send all inserts to a single row,
// using the clusterings to build CellPath and write our rows into a Map
@Param({"-1", "1", "8"})
int columnCount;
@Param({"0", "0.5"})
float insertRowOverlap;
@Param({"1", "32", "256"})
int insertRowCount;
@Param({"8", "256"})
int valueSize;
int rolloverAfterInserts;
Distribution distribution;
Distribution timestamps;
boolean uniquePerTrial;
// hacky way to pass in the number of threads we're executing concurrently in JMH
int threadCount;
public static class GlobalState
RegularAndStaticColumns partitionColumns;
TableMetadata metadata;
ColumnMetadata[] columns;
Clustering[] clusterings;
CellPath[] complexPaths;
ByteBuffer value;
public void setup(AtomicBTreePartitionUpdateBench bench)
ColumnMetadata[] partitionKeyColumns = bench.partitionKeyColumns();
ColumnMetadata[] clusteringColumns = bench.clusteringColumns();
ColumnMetadata[] regularColumns = bench.regularColumns();
partitionColumns = RegularAndStaticColumns.builder().addAll(Arrays.asList(regularColumns)).build();
columns = regularColumns;
metadata = bench.metadata(partitionKeyColumns, clusteringColumns, columns);
clusterings = bench.clusterings();
complexPaths = bench.complexPaths(regularColumns);
value = ByteBuffer.allocate(bench.valueSize);
// stateful; cannot be shared between threads
private static class UpdateGenerator
final Random random;
final TableMetadata metadata;
final RegularAndStaticColumns partitionColumns;
final boolean isComplex;
final ColumnMetadata[] columns;
final Clustering[] clusterings;
final CellPath[] complexPaths;
final float insertRowOverlap;
final Distribution distribution;
final IntToLongFunction timestamps;
final ByteBuffer value;
final IntVisitor insertRowCount;
final Row[] insertBuffer;
final ColumnData[] columnBuffer;
final Cell[] complexBuffer;
int offset;
UpdateGenerator(GlobalState global, AtomicBTreePartitionUpdateBench bench, long seed)
this.random = new Random(seed);
this.metadata = global.metadata;
this.partitionColumns = global.partitionColumns;
this.columns = global.columns.clone();
this.isComplex = this.columns[0].isComplex();
this.clusterings = global.clusterings.clone();
this.complexPaths = global.complexPaths.clone();
this.value = global.value;
this.insertRowCount = new IntVisitor(bench.insertRowCount);
this.insertRowOverlap = bench.insertRowOverlap;
this.distribution = bench.distribution;
this.timestamps = bench.timestamps == Distribution.RANDOM ? i -> random.nextLong() : i -> i;
this.insertBuffer = new Row[bench.insertRowCount * 2];
this.columnBuffer = new ColumnData[columns.length];
this.complexBuffer = new Cell[complexPaths.length];
void reset()
offset = 0;
final Function<Clustering, Row> simpleRow = this::simpleRow;
final Function<CellPath, Cell> complexCell = this::complexCell;
PartitionUpdate next()
int rowCount;
if (!isComplex)
rowCount = selectSortAndTransform(insertBuffer, clusterings, metadata.comparator, simpleRow);
rowCount = 1;
insertBuffer[0] = complexRow();
try (BulkIterator<Row> iter = BulkIterator.of(insertBuffer))
Object[] tree =, rowCount, UpdateFunction.noOp());
return PartitionUpdate.unsafeConstruct(metadata, decoratedKey, AbstractBTreePartition.unsafeConstructHolder(partitionColumns, tree, DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS), NO_DELETION_INFO, false);
private <I, O> int selectSortAndTransform(O[] out, I[] in, Comparator<? super I> comparator, Function<I, O> transform)
int prevRowCount = offset == 0 ? 0 : insertRowCount.cur();
int rowCount =;
switch (distribution)
for (int i = 0 ; i < rowCount ; ++i)
out[i] = transform.apply(in[i + offset]);
offset += rowCount * (1f - insertRowOverlap);
case RANDOM:
int rowOverlap = (int) (insertRowOverlap * min(rowCount, prevRowCount));
shuffle(random, in, 0, rowOverlap, 0, prevRowCount);
shuffle(random, in, rowOverlap, rowCount - rowOverlap, prevRowCount, in.length - prevRowCount);
Arrays.sort(in, 0, rowCount, comparator);
for (int i = 0 ; i < rowCount ; ++i)
out[i] = transform.apply(in[i]);
return rowCount;
Row simpleRow(Clustering clustering)
for (int i = 0 ; i < columns.length ; ++i)
columnBuffer[i] = cell(columns[i], null);
return bufferToRow(clustering);
Row complexRow()
int mapCount = selectSortAndTransform(complexBuffer, complexPaths, columns[0].cellPathComparator(), complexCell);
try (BulkIterator<ColumnData> iter = BulkIterator.of(complexBuffer))
columnBuffer[0] = ComplexColumnData.unsafeConstruct(columns[0],, mapCount, UpdateFunction.noOp()), DeletionTime.LIVE);
return bufferToRow(clusterings[0]);
Row bufferToRow(Clustering clustering)
try (BulkIterator<ColumnData> iter = BulkIterator.of(columnBuffer))
return BTreeRow.create(clustering, LivenessInfo.EMPTY, Row.Deletion.LIVE,, columns.length, UpdateFunction.noOp()));
Cell simpleCell(ColumnMetadata column)
return cell(column, null);
Cell complexCell(CellPath path)
return cell(columns[0], path);
Cell cell(ColumnMetadata column, CellPath path)
return new BufferCell(column, timestamps.applyAsLong(offset), Cell.NO_TTL, Cell.NO_DELETION_TIME, value, path);
private static class Batch
final AtomicBTreePartition update;
final PartitionUpdate[] insert;
// low 20 bits contain the next insert we're performing this generation
// next 20 bits are inserts we've performed this generation
// next 24 bits are generation (i.e. number of times we've run this update)
final AtomicLong state = new AtomicLong();
final AtomicLong activeThreads = new AtomicLong();
final MemtableAllocator allocator;
final Cloner cloner;
final int waitForActiveThreads;
/** Signals to replace the reference in {@code update} after this many invocations of {@code allocator.allocate} */
private int invalidateOn;
public Batch(int threads, TableMetadata metadata, UpdateGenerator generator, int rolloverAfterInserts)
waitForActiveThreads = threads;
allocator = new HeapPool.Allocator(POOL)
public Cloner cloner(OpOrder.Group opGroup)
return new ByteBufferCloner()
public ByteBuffer allocate(int size)
if (invalidateOn > 0 && --invalidateOn == 0)
AbstractBTreePartition.Holder holder = update.unsafeGetHolder();
if (!BTree.isEmpty(holder.tree))
holder.columns, Arrays.copyOf(holder.tree, holder.tree.length), holder.deletionInfo, holder.staticRow, holder.stats));
return ByteBuffer.allocate(size);
update = new AtomicBTreePartition(TableMetadataRef.forOfflineTools(metadata), decoratedKey, allocator);
cloner = allocator.cloner(NO_ORDER.getCurrent());
insert = IntStream.range(0, rolloverAfterInserts).mapToObj(i ->[]::new);
boolean performOne(int ifGeneration, Consumer<Batch> invokeBefore)
int index;
ifGeneration &= 0xffffff;
while (true)
long cur = state.get();
int curGeneration = ((int) (cur >>> 40)) & 0xffffff;
if (curGeneration == ifGeneration)
index = ((int) cur) & 0xfffff;
if (index == this.insert.length)
return false;
if (state.compareAndSet(cur, cur + 1)) break;
else continue;
if (ifGeneration < curGeneration)
return false;
// should never really happen
if (activeThreads.get() < waitForActiveThreads)
// try to prevent threads scattering to the four winds and updating different partitions
// we don't wait forever, as we can't synchronise with JMH to ensure all threads stop together
long start = System.nanoTime();
while (activeThreads.get() < waitForActiveThreads && TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) < 50)
update.addAllWithSizeDelta(insert[index], cloner, NO_ORDER.getCurrent(), UpdateTransaction.NO_OP);
return true;
if (state.addAndGet(0x100000L) == ((((long)ifGeneration) << 40) | (((long)insert.length) << 20) | insert.length))
// reset the state and rollover the generation
state.set((ifGeneration + 1L) << 40);
public static class Batches
Batch[] batches;
void setup(int threadsForBatch, int threadsForTest, AtomicBTreePartitionUpdateBench bench, GlobalState global)
UpdateGenerator generator = new UpdateGenerator(global, bench, bench.uniqueThreadInitialisation.incrementAndGet());
int batchCount = Math.min(256, max(1, (int) (Runtime.getRuntime().maxMemory() / (2 * ((threadsForTest + threadsForBatch - 1) / threadsForBatch) * bench.sizeInBytesOfOneBatch()))));
batches = IntStream.range(0, batchCount).mapToObj(i -> new Batch(threadsForBatch, global.metadata, generator, bench.rolloverAfterInserts))
Batch get(int i)
return batches[i % batches.length];
public static class SingleThreadUpdates extends Batches
int i;
int gen;
public void setup(AtomicBTreePartitionUpdateBench bench, GlobalState global)
super.setup(1, bench.threadCount, bench, global);
void performOne(Consumer<Batch> invokeFirst)
while (true)
if (get(i).performOne(gen, invokeFirst))
if (++i == batches.length) { i = 0; ++gen; }
public static class ConcurrentUpdates extends Batches
final AtomicLong state = new AtomicLong();
public void setup(AtomicBTreePartitionUpdateBench bench, GlobalState global)
super.setup(bench.threadCount, bench.threadCount, bench, global);
void performOne(Consumer<Batch> invokeFirst)
while (true)
long cur = state.get();
int generation = (int) (cur >>> 32);
int index = (int) cur;
if (get(index).performOne(generation, invokeFirst))
state.compareAndSet(cur, (cur & 0xffffffffL) == batches.length ? ((cur & 0xffffffff00000000L) + 0x100000000L) : cur + 1);
public void success(SingleThreadUpdates updates)
updates.performOne(batch -> {});
public void oneFailure(SingleThreadUpdates updates)
updates.performOne(batch -> batch.invalidateOn = ThreadLocalRandom.current().nextInt(insertRowCount * max(1, columnCount)));
public void concurrent(ConcurrentUpdates updates)
updates.performOne(batch -> {});
private int sizeInBytesOfOneBatch()
// 50 bytes ~= size of a Cell
return 50 * max(columnCount, 1) * (insertRowCount + insertRowCount/2) * rolloverAfterInserts;
private ColumnMetadata[] regularColumns()
if (columnCount >= 0)
return regularColumns(BytesType.instance, columnCount);
AbstractType[] types = new AbstractType[clusteringCount];
Arrays.fill(types, BytesType.instance);
return regularColumns(MapType.getInstance(CompositeType.getInstance(types), BytesType.instance, true), 1);
private ColumnMetadata[] partitionKeyColumns()
return columns(Int32Type.instance, ColumnMetadata.Kind.PARTITION_KEY, 1, "pk");
private ColumnMetadata[] clusteringColumns()
return columns(Int32Type.instance, ColumnMetadata.Kind.CLUSTERING, clusteringCount, "c");
private TableMetadata metadata(ColumnMetadata[] partitionKeyColumns, ColumnMetadata[] clusteringColumns, ColumnMetadata[] regularColumns)
List<ColumnMetadata> columns = ImmutableList.<ColumnMetadata>builder()
return TableMetadata.builder("", "")
private int uniqueRowCount()
return (int) (insertRowCount * (1 + (rolloverAfterInserts * (1f - insertRowOverlap))));
private Clustering[] clusterings()
Clustering<ByteBuffer> prefix = Clustering.make(IntStream.range(0, clusteringCount - 1).mapToObj(i -> zero).toArray(ByteBuffer[]::new));
int rowCount = columnCount >= 0 ? uniqueRowCount() : 1;
return clusterings(rowCount, prefix);
private CellPath[] complexPaths(ColumnMetadata[] columns)
if (columnCount >= 0)
return new CellPath[0];
Clustering<ByteBuffer> prefix = Clustering.make(IntStream.range(0, clusteringCount - 1).mapToObj(i -> zero).toArray(ByteBuffer[]::new));
return complexPaths((CompositeType) ((MapType)columns[0].type).getKeysType(), uniqueRowCount(), prefix);
private static ColumnMetadata[] regularColumns(AbstractType<?> type, int count)
return columns(type, ColumnMetadata.Kind.REGULAR, count, "v");
private static ColumnMetadata[] columns(AbstractType<?> type, ColumnMetadata.Kind kind, int count, String prefix)
return IntStream.range(0, count)
.mapToObj(i -> new ColumnMetadata("", "", new ColumnIdentifier(prefix + i, true), type, kind != ColumnMetadata.Kind.REGULAR ? i : ColumnMetadata.NO_POSITION, kind))
private static Clustering[] clusterings(int rowCount, Clustering<ByteBuffer> prefix)
return IntStream.range(0, rowCount)
.mapToObj(i -> {
ByteBuffer[] values = Arrays.copyOf(prefix.getBufferArray(), prefix.size() + 1);
values[prefix.size()] = Int32Type.instance.decompose(i);
return Clustering.make(values);
private static CellPath[] complexPaths(CompositeType type, int pathCount, Clustering<ByteBuffer> prefix)
return IntStream.range(0, pathCount)
.mapToObj(i -> {
Object[] values = Arrays.copyOf(prefix.getRawValues(), prefix.size() + 1);
values[prefix.size()] = Int32Type.instance.decompose(i);
return CellPath.create(type.decompose(values));
private static <T> void shuffleAndSort(Random random, T[] data, int size, Comparator<T> comparator)
shuffle(random, data, size);
Arrays.sort(data, 0, size, comparator);
private static void shuffle(Random random, Object[] data, int size)
shuffle(random, data, 0, size, 0, data.length);
private static void shuffle(Random random, Object[] data, int trgOffset, int trgSize, int srcOffset, int srcSize)
for (int i = 0 ; i < trgSize ; ++i)
int swap = srcOffset + srcSize == 0 ? 0 : random.nextInt(srcSize);
Object tmp = data[swap];
data[swap] = data[i + trgOffset];
data[i + trgOffset] = tmp;