blob: f4c93d6ad99143634f5befd82e6f22f387137139 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.partition;
import static org.junit.Assert.*;
import java.util.*;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Slice.Bound;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.partitions.AbstractBTreePartition;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.rows.Row.Deletion;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.SearchIterator;
public class PartitionImplementationTest
{
private static final String KEYSPACE = "PartitionImplementationTest";
private static final String CF = "Standard";
private static final int ENTRIES = 250;
private static final int TESTS = 1000;
private static final int KEY_RANGE = ENTRIES * 5;
private static final int TIMESTAMP = KEY_RANGE + 1;
private static CFMetaData cfm;
private Random rand = new Random(2);
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
cfm = CFMetaData.Builder.create(KEYSPACE, CF)
.addPartitionKey("pk", AsciiType.instance)
.addClusteringColumn("ck", AsciiType.instance)
.addRegularColumn("col", AsciiType.instance)
.addStaticColumn("static_col", AsciiType.instance)
.build();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
cfm);
}
private List<Row> generateRows()
{
List<Row> content = new ArrayList<>();
Set<Integer> keysUsed = new HashSet<>();
for (int i = 0; i < ENTRIES; ++i)
{
int rk;
do
{
rk = rand.nextInt(KEY_RANGE);
}
while (!keysUsed.add(rk));
content.add(makeRow(clustering(rk), "Col" + rk));
}
return content; // not sorted
}
Row makeRow(Clustering clustering, String colValue)
{
ColumnDefinition defCol = cfm.getColumnDefinition(new ColumnIdentifier("col", true));
Row.Builder row = BTreeRow.unsortedBuilder(TIMESTAMP);
row.newRow(clustering);
row.addCell(BufferCell.live(cfm, defCol, TIMESTAMP, ByteBufferUtil.bytes(colValue)));
return row.build();
}
Row makeStaticRow()
{
ColumnDefinition defCol = cfm.getColumnDefinition(new ColumnIdentifier("static_col", true));
Row.Builder row = BTreeRow.unsortedBuilder(TIMESTAMP);
row.newRow(Clustering.STATIC_CLUSTERING);
row.addCell(BufferCell.live(cfm, defCol, TIMESTAMP, ByteBufferUtil.bytes("static value")));
return row.build();
}
private List<Unfiltered> generateMarkersOnly()
{
return addMarkers(new ArrayList<>());
}
private List<Unfiltered> generateUnfiltereds()
{
List<Unfiltered> content = new ArrayList<>(generateRows());
return addMarkers(content);
}
List<Unfiltered> addMarkers(List<Unfiltered> content)
{
List<RangeTombstoneMarker> markers = new ArrayList<>();
Set<Integer> delTimes = new HashSet<>();
for (int i = 0; i < ENTRIES / 10; ++i)
{
int delTime;
do
{
delTime = rand.nextInt(KEY_RANGE);
}
while (!delTimes.add(delTime));
int start = rand.nextInt(KEY_RANGE);
DeletionTime dt = new DeletionTime(delTime, delTime);
RangeTombstoneMarker open = RangeTombstoneBoundMarker.inclusiveOpen(false, clustering(start).getRawValues(), dt);
int end = start + rand.nextInt((KEY_RANGE - start) / 4 + 1);
RangeTombstoneMarker close = RangeTombstoneBoundMarker.inclusiveClose(false, clustering(end).getRawValues(), dt);
markers.add(open);
markers.add(close);
}
markers.sort(cfm.comparator);
RangeTombstoneMarker toAdd = null;
Set<DeletionTime> open = new HashSet<>();
DeletionTime current = DeletionTime.LIVE;
for (RangeTombstoneMarker marker : markers)
{
if (marker.isOpen(false))
{
DeletionTime delTime = marker.openDeletionTime(false);
open.add(delTime);
if (delTime.supersedes(current))
{
if (toAdd != null)
{
if (cfm.comparator.compare(toAdd, marker) != 0)
content.add(toAdd);
else
{
// gotta join
current = toAdd.isClose(false) ? toAdd.closeDeletionTime(false) : DeletionTime.LIVE;
}
}
if (current != DeletionTime.LIVE)
marker = RangeTombstoneBoundaryMarker.makeBoundary(false, marker.openBound(false).invert(), marker.openBound(false), current, delTime);
toAdd = marker;
current = delTime;
}
}
else
{
assert marker.isClose(false);
DeletionTime delTime = marker.closeDeletionTime(false);
boolean removed = open.remove(delTime);
assert removed;
if (current.equals(delTime))
{
if (toAdd != null)
{
if (cfm.comparator.compare(toAdd, marker) != 0)
content.add(toAdd);
else
{
// gotta join
current = toAdd.closeDeletionTime(false);
marker = new RangeTombstoneBoundMarker(marker.closeBound(false), current);
}
}
DeletionTime best = open.stream().max(DeletionTime::compareTo).orElse(DeletionTime.LIVE);
if (best != DeletionTime.LIVE)
marker = RangeTombstoneBoundaryMarker.makeBoundary(false, marker.closeBound(false), marker.closeBound(false).invert(), current, best);
toAdd = marker;
current = best;
}
}
}
content.add(toAdd);
assert current == DeletionTime.LIVE;
assert open.isEmpty();
return content;
}
private Clustering clustering(int i)
{
return cfm.comparator.make(String.format("Row%06d", i));
}
private void test(Supplier<Collection<? extends Unfiltered>> content, Row staticRow)
{
for (int i = 0; i<TESTS; ++i)
{
try
{
rand = new Random(i);
testIter(content, staticRow);
}
catch (Throwable t)
{
throw new AssertionError("Test failed with seed " + i, t);
}
}
}
private void testIter(Supplier<Collection<? extends Unfiltered>> contentSupplier, Row staticRow)
{
NavigableSet<Clusterable> sortedContent = new TreeSet<Clusterable>(cfm.comparator);
sortedContent.addAll(contentSupplier.get());
AbstractBTreePartition partition;
try (UnfilteredRowIterator iter = new Util.UnfilteredSource(cfm, Util.dk("pk"), staticRow, sortedContent.stream().map(x -> (Unfiltered) x).iterator()))
{
partition = ImmutableBTreePartition.create(iter);
}
ColumnDefinition defCol = cfm.getColumnDefinition(new ColumnIdentifier("col", true));
ColumnFilter cf = ColumnFilter.selectionBuilder().add(defCol).build();
Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, cfm) : x;
Slices slices = Slices.with(cfm.comparator, Slice.make(clustering(KEY_RANGE / 4), clustering(KEY_RANGE * 3 / 4)));
Slices multiSlices = makeSlices();
// lastRow
assertRowsEqual((Row) get(sortedContent.descendingSet(), x -> x instanceof Row),
partition.lastRow());
// get(static)
assertRowsEqual(staticRow,
partition.getRow(Clustering.STATIC_CLUSTERING));
// get
for (int i=0; i < KEY_RANGE; ++i)
{
Clustering cl = clustering(i);
assertRowsEqual(getRow(sortedContent, cl),
partition.getRow(cl));
}
// isEmpty
assertEquals(sortedContent.isEmpty() && staticRow == null,
partition.isEmpty());
// hasRows
assertEquals(sortedContent.stream().anyMatch(x -> x instanceof Row),
partition.hasRows());
// iterator
assertIteratorsEqual(sortedContent.stream().filter(x -> x instanceof Row).iterator(),
partition.iterator());
// unfiltered iterator
assertIteratorsEqual(sortedContent.iterator(),
partition.unfilteredIterator());
// unfiltered iterator
assertIteratorsEqual(sortedContent.iterator(),
partition.unfilteredIterator(ColumnFilter.all(cfm), Slices.ALL, false));
// column-filtered
assertIteratorsEqual(sortedContent.stream().map(colFilter).iterator(),
partition.unfilteredIterator(cf, Slices.ALL, false));
// sliced
assertIteratorsEqual(slice(sortedContent, slices.get(0)),
partition.unfilteredIterator(ColumnFilter.all(cfm), slices, false));
assertIteratorsEqual(streamOf(slice(sortedContent, slices.get(0))).map(colFilter).iterator(),
partition.unfilteredIterator(cf, slices, false));
// randomly multi-sliced
assertIteratorsEqual(slice(sortedContent, multiSlices),
partition.unfilteredIterator(ColumnFilter.all(cfm), multiSlices, false));
assertIteratorsEqual(streamOf(slice(sortedContent, multiSlices)).map(colFilter).iterator(),
partition.unfilteredIterator(cf, multiSlices, false));
// reversed
assertIteratorsEqual(sortedContent.descendingIterator(),
partition.unfilteredIterator(ColumnFilter.all(cfm), Slices.ALL, true));
assertIteratorsEqual(sortedContent.descendingSet().stream().map(colFilter).iterator(),
partition.unfilteredIterator(cf, Slices.ALL, true));
assertIteratorsEqual(invert(slice(sortedContent, slices.get(0))),
partition.unfilteredIterator(ColumnFilter.all(cfm), slices, true));
assertIteratorsEqual(streamOf(invert(slice(sortedContent, slices.get(0)))).map(colFilter).iterator(),
partition.unfilteredIterator(cf, slices, true));
assertIteratorsEqual(invert(slice(sortedContent, multiSlices)),
partition.unfilteredIterator(ColumnFilter.all(cfm), multiSlices, true));
assertIteratorsEqual(streamOf(invert(slice(sortedContent, multiSlices))).map(colFilter).iterator(),
partition.unfilteredIterator(cf, multiSlices, true));
// search iterator
testSearchIterator(sortedContent, partition, ColumnFilter.all(cfm), false);
testSearchIterator(sortedContent, partition, cf, false);
testSearchIterator(sortedContent, partition, ColumnFilter.all(cfm), true);
testSearchIterator(sortedContent, partition, cf, true);
// sliceable iter
testSliceableIterator(sortedContent, partition, ColumnFilter.all(cfm), false);
testSliceableIterator(sortedContent, partition, cf, false);
testSliceableIterator(sortedContent, partition, ColumnFilter.all(cfm), true);
testSliceableIterator(sortedContent, partition, cf, true);
}
void testSearchIterator(NavigableSet<Clusterable> sortedContent, Partition partition, ColumnFilter cf, boolean reversed)
{
SearchIterator<Clustering, Row> searchIter = partition.searchIterator(cf, reversed);
int pos = reversed ? KEY_RANGE : 0;
int mul = reversed ? -1 : 1;
boolean started = false;
while (pos < KEY_RANGE)
{
int skip = rand.nextInt(KEY_RANGE / 10);
pos += skip * mul;
Clustering cl = clustering(pos);
Row row = searchIter.next(cl); // returns row with deletion, incl. empty row with deletion
if (row == null && skip == 0 && started) // allowed to return null if already reported row
continue;
started = true;
Row expected = getRow(sortedContent, cl);
assertEquals(expected == null, row == null);
if (row == null)
continue;
assertRowsEqual(expected.filter(cf, cfm), row);
}
}
Slices makeSlices()
{
int pos = 0;
Slices.Builder builder = new Slices.Builder(cfm.comparator);
while (pos <= KEY_RANGE)
{
int skip = rand.nextInt(KEY_RANGE / 10) * (rand.nextInt(3) + 2 / 3); // increased chance of getting 0
pos += skip;
int sz = rand.nextInt(KEY_RANGE / 10) + (skip == 0 ? 1 : 0); // if start is exclusive need at least sz 1
Clustering start = clustering(pos);
pos += sz;
Clustering end = clustering(pos);
Slice slice = Slice.make(skip == 0 ? Bound.exclusiveStartOf(start) : Bound.inclusiveStartOf(start), Bound.inclusiveEndOf(end));
builder.add(slice);
}
return builder.build();
}
void testSliceableIterator(NavigableSet<Clusterable> sortedContent, AbstractBTreePartition partition, ColumnFilter cf, boolean reversed)
{
Function<? super Clusterable, ? extends Clusterable> colFilter = x -> x instanceof Row ? ((Row) x).filter(cf, cfm) : x;
Slices slices = makeSlices();
try (SliceableUnfilteredRowIterator sliceableIter = partition.sliceableUnfilteredIterator(cf, reversed))
{
for (Slice slice : (Iterable<Slice>) () -> directed(slices, reversed))
assertIteratorsEqual(streamOf(directed(slice(sortedContent, slice), reversed)).map(colFilter).iterator(),
sliceableIter.slice(slice));
}
// Try using sliceable as unfiltered iterator
try (SliceableUnfilteredRowIterator sliceableIter = partition.sliceableUnfilteredIterator(cf, reversed))
{
assertIteratorsEqual((reversed ? sortedContent.descendingSet() : sortedContent).
stream().map(colFilter).iterator(),
sliceableIter);
}
}
private<T> Iterator<T> invert(Iterator<T> slice)
{
Deque<T> dest = new LinkedList<>();
Iterators.addAll(dest, slice);
return dest.descendingIterator();
}
private Iterator<Clusterable> slice(NavigableSet<Clusterable> sortedContent, Slices slices)
{
return Iterators.concat(streamOf(slices).map(slice -> slice(sortedContent, slice)).iterator());
}
private Iterator<Clusterable> slice(NavigableSet<Clusterable> sortedContent, Slice slice)
{
// Slice bounds are inclusive bounds, equal only to markers. Matched markers should be returned as one-sided boundaries.
RangeTombstoneMarker prev = (RangeTombstoneMarker) sortedContent.headSet(slice.start(), true).descendingSet().stream().filter(x -> x instanceof RangeTombstoneMarker).findFirst().orElse(null);
RangeTombstoneMarker next = (RangeTombstoneMarker) sortedContent.tailSet(slice.end(), true).stream().filter(x -> x instanceof RangeTombstoneMarker).findFirst().orElse(null);
Iterator<Clusterable> result = sortedContent.subSet(slice.start(), false, slice.end(), false).iterator();
if (prev != null && prev.isOpen(false))
result = Iterators.concat(Iterators.singletonIterator(new RangeTombstoneBoundMarker(slice.start(), prev.openDeletionTime(false))), result);
if (next != null && next.isClose(false))
result = Iterators.concat(result, Iterators.singletonIterator(new RangeTombstoneBoundMarker(slice.end(), next.closeDeletionTime(false))));
return result;
}
private Iterator<Slice> directed(Slices slices, boolean reversed)
{
return directed(slices.iterator(), reversed);
}
private <T> Iterator<T> directed(Iterator<T> iter, boolean reversed)
{
if (!reversed)
return iter;
return invert(iter);
}
private <T> Stream<T> streamOf(Iterator<T> iterator)
{
Iterable<T> iterable = () -> iterator;
return streamOf(iterable);
}
<T> Stream<T> streamOf(Iterable<T> iterable)
{
return StreamSupport.stream(iterable.spliterator(), false);
}
private void assertIteratorsEqual(Iterator<? extends Clusterable> it1, Iterator<? extends Clusterable> it2)
{
Clusterable[] a1 = (Clusterable[]) Iterators.toArray(it1, Clusterable.class);
Clusterable[] a2 = (Clusterable[]) Iterators.toArray(it2, Clusterable.class);
if (Arrays.equals(a1, a2))
return;
String a1s = Stream.of(a1).map(x -> "\n" + (x instanceof Unfiltered ? ((Unfiltered) x).toString(cfm) : x.toString())).collect(Collectors.toList()).toString();
String a2s = Stream.of(a2).map(x -> "\n" + (x instanceof Unfiltered ? ((Unfiltered) x).toString(cfm) : x.toString())).collect(Collectors.toList()).toString();
assertArrayEquals("Arrays differ. Expected " + a1s + " was " + a2s, a1, a2);
}
private Row getRow(NavigableSet<Clusterable> sortedContent, Clustering cl)
{
NavigableSet<Clusterable> nexts = sortedContent.tailSet(cl, true);
if (nexts.isEmpty())
return null;
Row row = nexts.first() instanceof Row && cfm.comparator.compare(cl, nexts.first()) == 0 ? (Row) nexts.first() : null;
for (Clusterable next : nexts)
if (next instanceof RangeTombstoneMarker)
{
RangeTombstoneMarker rt = (RangeTombstoneMarker) next;
if (!rt.isClose(false))
return row;
DeletionTime delTime = rt.closeDeletionTime(false);
return row == null ? BTreeRow.emptyDeletedRow(cl, Deletion.regular(delTime)) : row.filter(ColumnFilter.all(cfm), delTime, true, cfm);
}
return row;
}
private void assertRowsEqual(Row expected, Row actual)
{
try
{
assertEquals(expected == null, actual == null);
if (expected == null)
return;
assertEquals(expected.clustering(), actual.clustering());
assertEquals(expected.deletion(), actual.deletion());
assertArrayEquals(Iterables.toArray(expected.cells(), Cell.class), Iterables.toArray(expected.cells(), Cell.class));
} catch (Throwable t)
{
throw new AssertionError(String.format("Row comparison failed, expected %s got %s", expected, actual), t);
}
}
private static<T> T get(NavigableSet<T> sortedContent, Predicate<T> test)
{
return sortedContent.stream().filter(test).findFirst().orElse(null);
}
@Test
public void testEmpty()
{
test(() -> Collections.<Row>emptyList(), null);
}
@Test
public void testStaticOnly()
{
test(() -> Collections.<Row>emptyList(), makeStaticRow());
}
@Test
public void testRows()
{
test(this::generateRows, null);
}
@Test
public void testRowsWithStatic()
{
test(this::generateRows, makeStaticRow());
}
@Test
public void testMarkersOnly()
{
test(this::generateMarkersOnly, null);
}
@Test
public void testMarkersWithStatic()
{
test(this::generateMarkersOnly, makeStaticRow());
}
@Test
public void testUnfiltereds()
{
test(this::generateUnfiltereds, makeStaticRow());
}
}