blob: a94e5fd27d670a0de25a06fe7839fba8ac4bc75f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.reader;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.google.common.base.Preconditions;
import org.junit.jupiter.api.Test;
import org.apache.cassandra.bridge.CassandraBridgeImplementation;
import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.TemporaryDirectory;
import org.apache.cassandra.spark.utils.test.TestSSTable;
import org.apache.cassandra.spark.utils.test.TestSchema;
import org.jetbrains.annotations.NotNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.Generate.constant;
import static org.quicktheories.generators.SourceDSL.arbitrary;
import static org.quicktheories.generators.SourceDSL.integers;
import static org.quicktheories.generators.SourceDSL.maps;
public class IndexDbTests
{
private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation();
private static final class IndexRow implements Comparable<IndexRow>
{
private final BigInteger token;
private final int value;
private int position = 0;
IndexRow(IPartitioner partitioner, int value)
{
this.token = token(partitioner, value);
this.value = value;
}
public int compareTo(@NotNull IndexRow that)
{
return this.token.compareTo(that.token);
}
}
@Test
@SuppressWarnings("static-access")
public void testSearchIndex()
{
qt().forAll(arbitrary().enumValues(Partitioner.class))
.checkAssert(partitioner -> {
try (TemporaryDirectory directory = new TemporaryDirectory())
{
TestSchema schema = TestSchema.basicBuilder(BRIDGE).withCompression(false).build();
IPartitioner iPartitioner = BRIDGE.getPartitioner(partitioner);
int numRows = 5000;
// Write an SSTable and record token
List<BigInteger> tokens = new ArrayList<>(numRows);
schema.writeSSTable(directory, BRIDGE, partitioner, writer -> {
for (int row = 0; row < numRows; row++)
{
ByteBuffer key = (ByteBuffer) ByteBuffer.allocate(4).putInt(row).flip();
BigInteger token = ReaderUtils.tokenToBigInteger(iPartitioner.decorateKey(key).getToken());
tokens.add(token);
writer.write(row, 0, row);
}
});
assertEquals(1, TestSSTable.countIn(directory.path()));
Collections.sort(tokens);
TableMetadata metadata = Schema.instance.getTableMetadata(schema.keyspace, schema.table);
assertNotNull(metadata, "Could not find table metadata");
Path summaryDb = TestSSTable.firstIn(directory.path(), FileType.SUMMARY);
assertNotNull(summaryDb, "Could not find summary");
SSTable ssTable = TestSSTable.firstIn(directory.path());
assertNotNull(ssTable, "Could not find SSTable");
int rowSize = 39;
int sample = 4;
// Sample the token list and read offset in Index.db for sampled list and verify the
// offset matches the expected we sample the list as IndexDbUtils.findStartOffset(...)
// returns the previous offset, so we want to test it correctly skips tokens less than
// the token we are looking for before returning
List<BigInteger> sparseList = IntStream.range(0, tokens.size())
.filter(index -> index > 0 && index % sample == 0)
.mapToObj(tokens::get)
.collect(Collectors.toList());
assertEquals((numRows / 4) - 1, sparseList.size());
try (DataInputStream in = new DataInputStream(Objects.requireNonNull(ssTable.openPrimaryIndexStream())))
{
try
{
for (int index = 0; index < sparseList.size(); index++)
{
BigInteger token = sparseList.get(index);
long expectedOffset = (((index + 1L) * sample) - 1) * rowSize;
long offset = IndexDbUtils.findStartOffset(in,
iPartitioner,
TokenRange.singleton(token),
Stats.DoNothingStats.INSTANCE);
assertEquals(expectedOffset, offset);
ReaderUtils.skipRowIndexEntry(in);
}
}
catch (EOFException ignore)
{
}
}
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
});
}
@Test
@SuppressWarnings("static-access")
public void testFindStartEndOffset()
{
int numValues = 5000;
qt().forAll(arbitrary().enumValues(Partitioner.class), // Partitioner
maps().of(integers().allPositive(), constant(1)).ofSize(numValues), // Unique keys (ignore values)
integers().between(1, numValues - 1)) // Start position
.checkAssert((partitioner, rawValues, startPos) -> {
IPartitioner iPartitioner = BRIDGE.getPartitioner(partitioner);
int rowSize = 256;
// Generate random index row values and sort by token
IndexRow[] rows = rawValues.keySet().stream()
.map(value -> new IndexRow(iPartitioner, value))
.sorted()
.toArray(IndexRow[]::new);
IntStream.range(0, rows.length).forEach(index -> rows[index].position = index * rowSize); // Update position offset
IndexRow startRow = rows[startPos];
int[] valuesAndOffsets = Arrays.stream(rows)
.map(row -> new int[]{row.value, row.position})
.flatMapToInt(Arrays::stream)
.toArray();
try (DataInputStream in = mockDataInputStream(valuesAndOffsets))
{
long startOffset = IndexDbUtils.findStartOffset(in,
iPartitioner,
TokenRange.singleton(startRow.token),
Stats.DoNothingStats.INSTANCE);
assertEquals(rows[startPos - 1].position, startOffset);
ReaderUtils.skipRowIndexEntry(in);
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
});
}
@Test
@SuppressWarnings("static-access")
public void testReadToken()
{
qt().withExamples(500)
.forAll(arbitrary().enumValues(Partitioner.class), integers().all())
.checkAssert((partitioner, value) -> {
IPartitioner iPartitioner = BRIDGE.getPartitioner(partitioner);
BigInteger expectedToken = token(iPartitioner, value);
try (DataInputStream in = mockDataInputStream(value, 0))
{
IndexDbUtils.readNextToken(iPartitioner, in, new Stats()
{
public void readPartitionIndexDb(ByteBuffer key, BigInteger token)
{
assertEquals(value.intValue(), key.getInt());
assertEquals(expectedToken, token);
}
});
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
}
);
}
@Test
public void testLessThan()
{
assertTrue(IndexDbUtils.isLessThan(BigInteger.valueOf(4L), TokenRange.openClosed(BigInteger.valueOf(5L), BigInteger.valueOf(10L))));
assertTrue(IndexDbUtils.isLessThan(BigInteger.valueOf(4L), TokenRange.closed(BigInteger.valueOf(5L), BigInteger.valueOf(10L))));
assertTrue(IndexDbUtils.isLessThan(BigInteger.valueOf(5L), TokenRange.openClosed(BigInteger.valueOf(5L), BigInteger.valueOf(10L))));
assertFalse(IndexDbUtils.isLessThan(BigInteger.valueOf(5L), TokenRange.closed(BigInteger.valueOf(5L), BigInteger.valueOf(10L))));
assertFalse(IndexDbUtils.isLessThan(BigInteger.valueOf(6L), TokenRange.openClosed(BigInteger.valueOf(5L), BigInteger.valueOf(10L))));
assertFalse(IndexDbUtils.isLessThan(BigInteger.valueOf(6L), TokenRange.closed(BigInteger.valueOf(5L), BigInteger.valueOf(10L))));
}
private static BigInteger token(IPartitioner iPartitioner, int value)
{
// Cast to ByteBuffer required when compiling with Java 8
return ReaderUtils.tokenToBigInteger(iPartitioner.decorateKey((ByteBuffer) ByteBuffer.allocate(4).putInt(value).flip()).getToken());
}
// Creates an in-memory DataInputStream mocking Index.db bytes, with length (short), key (int), position (vint)
private static DataInputStream mockDataInputStream(int... valuesAndOffsets) throws IOException
{
Preconditions.checkArgument(valuesAndOffsets.length % 2 == 0);
int numValues = valuesAndOffsets.length / 2;
int size = (numValues * 7); // 2 bytes short length, 4 bytes partition key value, 1 byte promoted index
size += IntStream.range(0, valuesAndOffsets.length) // Variable int for position offset
.filter(index -> (index + 1) % 2 == 0)
.map(index -> valuesAndOffsets[index])
.map(ReaderUtils::vIntSize)
.sum();
ByteBuffer buffer = ByteBuffer.allocate(size);
for (int index = 0; index < valuesAndOffsets.length; index += 2)
{
buffer.putShort((short) 4)
.putInt(valuesAndOffsets[index]); // Value
ReaderUtils.writePosition(valuesAndOffsets[index + 1], buffer); // Write variable int position offset
ReaderUtils.writePosition(0L, buffer); // Promoted index
}
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new DataInputStream(new ByteArrayInputStream(bytes));
}
}