blob: 6c8eed584183eba57a05b237840fdd2144491801 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import com.google.common.primitives.Ints;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.columniterator.AbstractSSTableIterator;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.serializers.LongSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
public class RowIndexEntryTest extends CQLTester
{
private static final List<AbstractType<?>> clusterTypes = Collections.singletonList(LongType.instance);
private static final ClusteringComparator comp = new ClusteringComparator(clusterTypes);
private static final byte[] dummy_100k = new byte[100000];
private static Clustering cn(long l)
{
return Util.clustering(comp, l);
}
@Test
public void testC11206AgainstPreviousArray() throws Exception
{
DatabaseDescriptor.setColumnIndexCacheSize(99999);
testC11206AgainstPrevious();
}
@Test
public void testC11206AgainstPreviousShallow() throws Exception
{
DatabaseDescriptor.setColumnIndexCacheSize(0);
testC11206AgainstPrevious();
}
private static void testC11206AgainstPrevious() throws Exception
{
// partition without IndexInfo
try (DoubleSerializer doubleSerializer = new DoubleSerializer())
{
doubleSerializer.build(null, partitionKey(42L),
Collections.singletonList(cn(42)),
0L);
assertEquals(doubleSerializer.rieOldSerialized, doubleSerializer.rieNewSerialized);
}
// partition with multiple IndexInfo
try (DoubleSerializer doubleSerializer = new DoubleSerializer())
{
doubleSerializer.build(null, partitionKey(42L),
Arrays.asList(cn(42), cn(43), cn(44)),
0L);
assertEquals(doubleSerializer.rieOldSerialized, doubleSerializer.rieNewSerialized);
}
// partition with multiple IndexInfo
try (DoubleSerializer doubleSerializer = new DoubleSerializer())
{
doubleSerializer.build(null, partitionKey(42L),
Arrays.asList(cn(42), cn(43), cn(44), cn(45), cn(46), cn(47), cn(48), cn(49), cn(50), cn(51)),
0L);
assertEquals(doubleSerializer.rieOldSerialized, doubleSerializer.rieNewSerialized);
}
}
private static DecoratedKey partitionKey(long l)
{
ByteBuffer key = LongSerializer.instance.serialize(l);
Token token = Murmur3Partitioner.instance.getToken(key);
return new BufferDecoratedKey(token, key);
}
private static class DoubleSerializer implements AutoCloseable
{
CFMetaData cfMeta = CFMetaData.compile("CREATE TABLE pipe.dev_null (pk bigint, ck bigint, val text, PRIMARY KEY(pk, ck))", "foo");
Version version = BigFormat.latestVersion;
DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
Row.Deletion deletion = Row.Deletion.LIVE;
SerializationHeader header = new SerializationHeader(true, cfMeta, cfMeta.partitionColumns(), EncodingStats.NO_STATS);
// create C-11206 + old serializer instances
RowIndexEntry.IndexSerializer rieSerializer = new RowIndexEntry.Serializer(cfMeta, version, header);
Pre_C_11206_RowIndexEntry.Serializer oldSerializer = new Pre_C_11206_RowIndexEntry.Serializer(cfMeta, version, header);
@SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" })
final DataOutputBuffer rieOutput = new DataOutputBuffer(1024);
@SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" })
final DataOutputBuffer oldOutput = new DataOutputBuffer(1024);
final SequentialWriter dataWriterNew;
final SequentialWriter dataWriterOld;
final org.apache.cassandra.db.ColumnIndex columnIndex;
RowIndexEntry rieNew;
ByteBuffer rieNewSerialized;
Pre_C_11206_RowIndexEntry rieOld;
ByteBuffer rieOldSerialized;
DoubleSerializer() throws IOException
{
SequentialWriterOption option = SequentialWriterOption.newBuilder().bufferSize(1024).build();
File f = File.createTempFile("RowIndexEntryTest-", "db");
dataWriterNew = new SequentialWriter(f, option);
columnIndex = new org.apache.cassandra.db.ColumnIndex(header, dataWriterNew, version, Collections.emptyList(),
rieSerializer.indexInfoSerializer());
f = File.createTempFile("RowIndexEntryTest-", "db");
dataWriterOld = new SequentialWriter(f, option);
}
public void close() throws Exception
{
dataWriterNew.close();
dataWriterOld.close();
}
void build(Row staticRow, DecoratedKey partitionKey,
Collection<Clustering> clusterings, long startPosition) throws IOException
{
Iterator<Clustering> clusteringIter = clusterings.iterator();
columnIndex.buildRowIndex(makeRowIter(staticRow, partitionKey, clusteringIter, dataWriterNew));
rieNew = RowIndexEntry.create(startPosition, 0L,
deletionInfo, columnIndex.headerLength, columnIndex.columnIndexCount,
columnIndex.indexInfoSerializedSize(),
columnIndex.indexSamples(), columnIndex.offsets(),
rieSerializer.indexInfoSerializer());
rieSerializer.serialize(rieNew, rieOutput, columnIndex.buffer());
rieNewSerialized = rieOutput.buffer().duplicate();
Iterator<Clustering> clusteringIter2 = clusterings.iterator();
ColumnIndex columnIndex = RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(makeRowIter(staticRow, partitionKey, clusteringIter2, dataWriterOld),
dataWriterOld, header, Collections.emptySet(), BigFormat.latestVersion);
rieOld = Pre_C_11206_RowIndexEntry.create(startPosition, deletionInfo, columnIndex);
oldSerializer.serialize(rieOld, oldOutput);
rieOldSerialized = oldOutput.buffer().duplicate();
}
private AbstractUnfilteredRowIterator makeRowIter(Row staticRow, DecoratedKey partitionKey,
Iterator<Clustering> clusteringIter, SequentialWriter dataWriter)
{
return new AbstractUnfilteredRowIterator(cfMeta, partitionKey, deletionInfo, cfMeta.partitionColumns(),
staticRow, false, new EncodingStats(0, 0, 0))
{
protected Unfiltered computeNext()
{
if (!clusteringIter.hasNext())
return endOfData();
try
{
// write some fake bytes to the data file to force writing the IndexInfo object
dataWriter.write(dummy_100k);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
return buildRow(clusteringIter.next());
}
};
}
private Unfiltered buildRow(Clustering clustering)
{
BTree.Builder<ColumnData> builder = BTree.builder(ColumnData.comparator);
builder.add(BufferCell.live(cfMeta.partitionColumns().iterator().next(),
1L,
ByteBuffer.allocate(0)));
return BTreeRow.create(clustering, primaryKeyLivenessInfo, deletion, builder.build());
}
}
/**
* Pre C-11206 code.
*/
static final class ColumnIndex
{
final long partitionHeaderLength;
final List<IndexInfo> columnsIndex;
private static final ColumnIndex EMPTY = new ColumnIndex(-1, Collections.emptyList());
private ColumnIndex(long partitionHeaderLength, List<IndexInfo> columnsIndex)
{
assert columnsIndex != null;
this.partitionHeaderLength = partitionHeaderLength;
this.columnsIndex = columnsIndex;
}
static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator,
SequentialWriter output,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
Version version) throws IOException
{
assert !iterator.isEmpty() && version.storeRows();
Builder builder = new Builder(iterator, output, header, observers, version.correspondingMessagingVersion());
return builder.build();
}
public static ColumnIndex nothing()
{
return EMPTY;
}
/**
* Help to create an index for a column family based on size of columns,
* and write said columns to disk.
*/
private static class Builder
{
private final UnfilteredRowIterator iterator;
private final SequentialWriter writer;
private final SerializationHeader header;
private final int version;
private final List<IndexInfo> columnsIndex = new ArrayList<>();
private final long initialPosition;
private long headerLength = -1;
private long startPosition = -1;
private int written;
private long previousRowStart;
private ClusteringPrefix firstClustering;
private ClusteringPrefix lastClustering;
private DeletionTime openMarker;
private final Collection<SSTableFlushObserver> observers;
Builder(UnfilteredRowIterator iterator,
SequentialWriter writer,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
int version)
{
this.iterator = iterator;
this.writer = writer;
this.header = header;
this.version = version;
this.observers = observers == null ? Collections.emptyList() : observers;
this.initialPosition = writer.position();
}
private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
{
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), header, writer, version);
}
public ColumnIndex build() throws IOException
{
writePartitionHeader(iterator);
this.headerLength = writer.position() - initialPosition;
while (iterator.hasNext())
add(iterator.next());
return close();
}
private long currentPosition()
{
return writer.position() - initialPosition;
}
private void addIndexBlock()
{
IndexInfo cIndexInfo = new IndexInfo(firstClustering,
lastClustering,
startPosition,
currentPosition() - startPosition,
openMarker);
columnsIndex.add(cIndexInfo);
firstClustering = null;
}
private void add(Unfiltered unfiltered) throws IOException
{
long pos = currentPosition();
if (firstClustering == null)
{
// Beginning of an index block. Remember the start and position
firstClustering = unfiltered.clustering();
startPosition = pos;
}
UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version);
// notify observers about each new row
if (!observers.isEmpty())
observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
lastClustering = unfiltered.clustering();
previousRowStart = pos;
++written;
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
RangeTombstoneMarker marker = (RangeTombstoneMarker)unfiltered;
openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null;
}
// if we hit the column index size that we have to index after, go ahead and index it.
if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize())
addIndexBlock();
}
private ColumnIndex close() throws IOException
{
UnfilteredSerializer.serializer.writeEndOfPartition(writer);
// It's possible we add no rows, just a top level deletion
if (written == 0)
return RowIndexEntryTest.ColumnIndex.EMPTY;
// the last column may have fallen on an index boundary already. if not, index it explicitly.
if (firstClustering != null)
addIndexBlock();
// we should always have at least one computed index block, but we only write it out if there is more than that.
assert !columnsIndex.isEmpty() && headerLength >= 0;
return new ColumnIndex(headerLength, columnsIndex);
}
}
}
@Test
public void testSerializedSize() throws Throwable
{
String tableName = createTable("CREATE TABLE %s (a int, b text, c int, PRIMARY KEY(a, b))");
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName);
Pre_C_11206_RowIndexEntry simple = new Pre_C_11206_RowIndexEntry(123);
DataOutputBuffer buffer = new DataOutputBuffer();
SerializationHeader header = new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS);
Pre_C_11206_RowIndexEntry.Serializer serializer = new Pre_C_11206_RowIndexEntry.Serializer(cfs.metadata, BigFormat.latestVersion, header);
serializer.serialize(simple, buffer);
assertEquals(buffer.getLength(), serializer.serializedSize(simple));
// write enough rows to ensure we get a few column index entries
for (int i = 0; i <= DatabaseDescriptor.getColumnIndexSize() / 4; i++)
execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, String.valueOf(i), i);
ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs).build());
File tempFile = File.createTempFile("row_index_entry_test", null);
tempFile.deleteOnExit();
SequentialWriter writer = new SequentialWriter(tempFile);
ColumnIndex columnIndex = RowIndexEntryTest.ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, Collections.emptySet(), BigFormat.latestVersion);
Pre_C_11206_RowIndexEntry withIndex = Pre_C_11206_RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
IndexInfo.Serializer indexSerializer = cfs.metadata.serializers().indexInfoSerializer(BigFormat.latestVersion, header);
// sanity check
assertTrue(columnIndex.columnsIndex.size() >= 3);
buffer = new DataOutputBuffer();
serializer.serialize(withIndex, buffer);
assertEquals(buffer.getLength(), serializer.serializedSize(withIndex));
// serialization check
ByteBuffer bb = buffer.buffer();
DataInputBuffer input = new DataInputBuffer(bb, false);
serializationCheck(withIndex, indexSerializer, bb, input);
// test with an output stream that doesn't support a file-pointer
buffer = new DataOutputBuffer()
{
public boolean hasPosition()
{
return false;
}
public long position()
{
throw new UnsupportedOperationException();
}
};
serializer.serialize(withIndex, buffer);
bb = buffer.buffer();
input = new DataInputBuffer(bb, false);
serializationCheck(withIndex, indexSerializer, bb, input);
//
bb = buffer.buffer();
input = new DataInputBuffer(bb, false);
Pre_C_11206_RowIndexEntry.Serializer.skip(input, BigFormat.latestVersion);
Assert.assertEquals(0, bb.remaining());
}
private static void serializationCheck(Pre_C_11206_RowIndexEntry withIndex, IndexInfo.Serializer indexSerializer, ByteBuffer bb, DataInputBuffer input) throws IOException
{
Assert.assertEquals(0xdeadbeef, input.readUnsignedVInt());
Assert.assertEquals(withIndex.promotedSize(indexSerializer), input.readUnsignedVInt());
Assert.assertEquals(withIndex.headerLength(), input.readUnsignedVInt());
Assert.assertEquals(withIndex.deletionTime(), DeletionTime.serializer.deserialize(input));
Assert.assertEquals(withIndex.columnsIndex().size(), input.readUnsignedVInt());
int offset = bb.position();
int[] offsets = new int[withIndex.columnsIndex().size()];
for (int i = 0; i < withIndex.columnsIndex().size(); i++)
{
int pos = bb.position();
offsets[i] = pos - offset;
IndexInfo info = indexSerializer.deserialize(input);
int end = bb.position();
Assert.assertEquals(indexSerializer.serializedSize(info), end - pos);
Assert.assertEquals(withIndex.columnsIndex().get(i).offset, info.offset);
Assert.assertEquals(withIndex.columnsIndex().get(i).width, info.width);
Assert.assertEquals(withIndex.columnsIndex().get(i).endOpenMarker, info.endOpenMarker);
Assert.assertEquals(withIndex.columnsIndex().get(i).firstName, info.firstName);
Assert.assertEquals(withIndex.columnsIndex().get(i).lastName, info.lastName);
}
for (int i = 0; i < withIndex.columnsIndex().size(); i++)
Assert.assertEquals(offsets[i], input.readInt());
Assert.assertEquals(0, bb.remaining());
}
static class Pre_C_11206_RowIndexEntry implements IMeasurableMemory
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new Pre_C_11206_RowIndexEntry(0));
public final long position;
Pre_C_11206_RowIndexEntry(long position)
{
this.position = position;
}
protected int promotedSize(IndexInfo.Serializer idxSerializer)
{
return 0;
}
public static Pre_C_11206_RowIndexEntry create(long position, DeletionTime deletionTime, ColumnIndex index)
{
assert index != null;
assert deletionTime != null;
// we only consider the columns summary when determining whether to create an IndexedEntry,
// since if there are insufficient columns to be worth indexing we're going to seek to
// the beginning of the row anyway, so we might as well read the tombstone there as well.
if (index.columnsIndex.size() > 1)
return new Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, index.partitionHeaderLength, index.columnsIndex);
else
return new Pre_C_11206_RowIndexEntry(position);
}
/**
* @return true if this index entry contains the row-level tombstone and column summary. Otherwise,
* caller should fetch these from the row header.
*/
public boolean isIndexed()
{
return !columnsIndex().isEmpty();
}
public DeletionTime deletionTime()
{
throw new UnsupportedOperationException();
}
/**
* The length of the row header (partition key, partition deletion and static row).
* This value is only provided for indexed entries and this method will throw
* {@code UnsupportedOperationException} if {@code !isIndexed()}.
*/
public long headerLength()
{
throw new UnsupportedOperationException();
}
public List<IndexInfo> columnsIndex()
{
return Collections.emptyList();
}
public long unsharedHeapSize()
{
return EMPTY_SIZE;
}
public static class Serializer
{
private final IndexInfo.Serializer idxSerializer;
private final Version version;
Serializer(CFMetaData metadata, Version version, SerializationHeader header)
{
this.idxSerializer = metadata.serializers().indexInfoSerializer(version, header);
this.version = version;
}
public void serialize(Pre_C_11206_RowIndexEntry rie, DataOutputPlus out) throws IOException
{
assert version.storeRows() : "We read old index files but we should never write them";
out.writeUnsignedVInt(rie.position);
out.writeUnsignedVInt(rie.promotedSize(idxSerializer));
if (rie.isIndexed())
{
out.writeUnsignedVInt(rie.headerLength());
DeletionTime.serializer.serialize(rie.deletionTime(), out);
out.writeUnsignedVInt(rie.columnsIndex().size());
// Calculate and write the offsets to the IndexInfo objects.
int[] offsets = new int[rie.columnsIndex().size()];
if (out.hasPosition())
{
// Out is usually a SequentialWriter, so using the file-pointer is fine to generate the offsets.
// A DataOutputBuffer also works.
long start = out.position();
int i = 0;
for (IndexInfo info : rie.columnsIndex())
{
offsets[i] = i == 0 ? 0 : (int)(out.position() - start);
i++;
idxSerializer.serialize(info, out);
}
}
else
{
// Not sure this branch will ever be needed, but if it is called, it has to calculate the
// serialized sizes instead of simply using the file-pointer.
int i = 0;
int offset = 0;
for (IndexInfo info : rie.columnsIndex())
{
offsets[i++] = offset;
idxSerializer.serialize(info, out);
offset += idxSerializer.serializedSize(info);
}
}
for (int off : offsets)
out.writeInt(off);
}
}
public Pre_C_11206_RowIndexEntry deserialize(DataInputPlus in) throws IOException
{
if (!version.storeRows())
{
long position = in.readLong();
int size = in.readInt();
if (size > 0)
{
DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
int entries = in.readInt();
List<IndexInfo> columnsIndex = new ArrayList<>(entries);
long headerLength = 0L;
for (int i = 0; i < entries; i++)
{
IndexInfo info = idxSerializer.deserialize(in);
columnsIndex.add(info);
if (i == 0)
headerLength = info.offset;
}
return new Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, headerLength, columnsIndex);
}
else
{
return new Pre_C_11206_RowIndexEntry(position);
}
}
long position = in.readUnsignedVInt();
int size = (int)in.readUnsignedVInt();
if (size > 0)
{
long headerLength = in.readUnsignedVInt();
DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
int entries = (int)in.readUnsignedVInt();
List<IndexInfo> columnsIndex = new ArrayList<>(entries);
for (int i = 0; i < entries; i++)
columnsIndex.add(idxSerializer.deserialize(in));
in.skipBytesFully(entries * TypeSizes.sizeof(0));
return new Pre_C_11206_RowIndexEntry.IndexedEntry(position, deletionTime, headerLength, columnsIndex);
}
else
{
return new Pre_C_11206_RowIndexEntry(position);
}
}
// Reads only the data 'position' of the index entry and returns it. Note that this left 'in' in the middle
// of reading an entry, so this is only useful if you know what you are doing and in most case 'deserialize'
// should be used instead.
static long readPosition(DataInputPlus in, Version version) throws IOException
{
return version.storeRows() ? in.readUnsignedVInt() : in.readLong();
}
public static void skip(DataInputPlus in, Version version) throws IOException
{
readPosition(in, version);
skipPromotedIndex(in, version);
}
private static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException
{
int size = version.storeRows() ? (int)in.readUnsignedVInt() : in.readInt();
if (size <= 0)
return;
in.skipBytesFully(size);
}
public int serializedSize(Pre_C_11206_RowIndexEntry rie)
{
assert version.storeRows() : "We read old index files but we should never write them";
int indexedSize = 0;
if (rie.isIndexed())
{
List<IndexInfo> index = rie.columnsIndex();
indexedSize += TypeSizes.sizeofUnsignedVInt(rie.headerLength());
indexedSize += DeletionTime.serializer.serializedSize(rie.deletionTime());
indexedSize += TypeSizes.sizeofUnsignedVInt(index.size());
for (IndexInfo info : index)
indexedSize += idxSerializer.serializedSize(info);
indexedSize += index.size() * TypeSizes.sizeof(0);
}
return TypeSizes.sizeofUnsignedVInt(rie.position) + TypeSizes.sizeofUnsignedVInt(indexedSize) + indexedSize;
}
}
/**
* An entry in the row index for a row whose columns are indexed.
*/
private static final class IndexedEntry extends Pre_C_11206_RowIndexEntry
{
private final DeletionTime deletionTime;
// The offset in the file when the index entry end
private final long headerLength;
private final List<IndexInfo> columnsIndex;
private static final long BASE_SIZE =
ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, Arrays.asList(null, null)))
+ ObjectSizes.measure(new ArrayList<>(1));
private IndexedEntry(long position, DeletionTime deletionTime, long headerLength, List<IndexInfo> columnsIndex)
{
super(position);
assert deletionTime != null;
assert columnsIndex != null && columnsIndex.size() > 1;
this.deletionTime = deletionTime;
this.headerLength = headerLength;
this.columnsIndex = columnsIndex;
}
@Override
public DeletionTime deletionTime()
{
return deletionTime;
}
@Override
public long headerLength()
{
return headerLength;
}
@Override
public List<IndexInfo> columnsIndex()
{
return columnsIndex;
}
@Override
protected int promotedSize(IndexInfo.Serializer idxSerializer)
{
long size = TypeSizes.sizeofUnsignedVInt(headerLength)
+ DeletionTime.serializer.serializedSize(deletionTime)
+ TypeSizes.sizeofUnsignedVInt(columnsIndex.size()); // number of entries
for (IndexInfo info : columnsIndex)
size += idxSerializer.serializedSize(info);
size += columnsIndex.size() * TypeSizes.sizeof(0);
return Ints.checkedCast(size);
}
@Override
public long unsharedHeapSize()
{
long entrySize = 0;
for (IndexInfo idx : columnsIndex)
entrySize += idx.unsharedHeapSize();
return BASE_SIZE
+ entrySize
+ deletionTime.unsharedHeapSize()
+ ObjectSizes.sizeOfReferenceArray(columnsIndex.size());
}
}
}
@Test
public void testIndexFor() throws IOException
{
DeletionTime deletionInfo = new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
List<IndexInfo> indexes = new ArrayList<>();
indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo));
indexes.add(new IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo));
indexes.add(new IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo));
RowIndexEntry rie = new RowIndexEntry(0L)
{
public IndexInfoRetriever openWithIndex(FileHandle indexFile)
{
return new IndexInfoRetriever()
{
public IndexInfo columnsIndex(int index)
{
return indexes.get(index);
}
public void close()
{
}
};
}
public int columnsIndexCount()
{
return indexes.size();
}
};
AbstractSSTableIterator.IndexState indexState = new AbstractSSTableIterator.IndexState(
null, comp, rie, false, null
);
assertEquals(0, indexState.indexFor(cn(-1L), -1));
assertEquals(0, indexState.indexFor(cn(5L), -1));
assertEquals(1, indexState.indexFor(cn(12L), -1));
assertEquals(2, indexState.indexFor(cn(17L), -1));
assertEquals(3, indexState.indexFor(cn(100L), -1));
assertEquals(3, indexState.indexFor(cn(100L), 0));
assertEquals(3, indexState.indexFor(cn(100L), 1));
assertEquals(3, indexState.indexFor(cn(100L), 2));
assertEquals(3, indexState.indexFor(cn(100L), 3));
indexState = new AbstractSSTableIterator.IndexState(
null, comp, rie, true, null
);
assertEquals(-1, indexState.indexFor(cn(-1L), -1));
assertEquals(0, indexState.indexFor(cn(5L), 3));
assertEquals(0, indexState.indexFor(cn(5L), 2));
assertEquals(1, indexState.indexFor(cn(17L), 3));
assertEquals(2, indexState.indexFor(cn(100L), 3));
assertEquals(2, indexState.indexFor(cn(100L), 4));
assertEquals(1, indexState.indexFor(cn(12L), 3));
assertEquals(1, indexState.indexFor(cn(12L), 2));
assertEquals(1, indexState.indexFor(cn(100L), 1));
assertEquals(2, indexState.indexFor(cn(100L), 2));
}
}