blob: de1b1dfe9ea0b04900dbb716a82e89220dc6f6a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.primitives.Ints;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Column index builder used by {@link org.apache.cassandra.io.sstable.format.big.BigTableWriter}.
* For index entries that exceed {@link org.apache.cassandra.config.Config#column_index_cache_size_in_kb},
* this uses the serialization logic as in {@link RowIndexEntry}.
*/
public class ColumnIndex
{
// used, if the row-index-entry reaches config column_index_cache_size_in_kb
private DataOutputBuffer buffer;
// used to track the size of the serialized size of row-index-entry (unused for buffer)
private int indexSamplesSerializedSize;
// used, until the row-index-entry reaches config column_index_cache_size_in_kb
private final List<IndexInfo> indexSamples = new ArrayList<>();
private DataOutputBuffer reusableBuffer;
public int columnIndexCount;
private int[] indexOffsets;
private final SerializationHeader header;
private final int version;
private final SequentialWriter writer;
private long initialPosition;
private final ISerializer<IndexInfo> idxSerializer;
public long headerLength;
private long startPosition;
private int written;
private long previousRowStart;
private ClusteringPrefix firstClustering;
private ClusteringPrefix lastClustering;
private DeletionTime openMarker;
private final Collection<SSTableFlushObserver> observers;
public ColumnIndex(SerializationHeader header,
SequentialWriter writer,
Version version,
Collection<SSTableFlushObserver> observers,
ISerializer<IndexInfo> indexInfoSerializer)
{
this.header = header;
this.writer = writer;
this.version = version.correspondingMessagingVersion();
this.observers = observers;
this.idxSerializer = indexInfoSerializer;
}
public void reset()
{
this.initialPosition = writer.position();
this.headerLength = -1;
this.startPosition = -1;
this.previousRowStart = 0;
this.columnIndexCount = 0;
this.written = 0;
this.indexSamplesSerializedSize = 0;
this.indexSamples.clear();
this.firstClustering = null;
this.lastClustering = null;
this.openMarker = null;
if (this.buffer != null)
this.reusableBuffer = this.buffer;
this.buffer = null;
}
public void buildRowIndex(UnfilteredRowIterator iterator) throws IOException
{
writePartitionHeader(iterator);
this.headerLength = writer.position() - initialPosition;
while (iterator.hasNext())
add(iterator.next());
finish();
}
private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
{
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
{
Row staticRow = iterator.staticRow();
UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version);
if (!observers.isEmpty())
observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
}
}
private long currentPosition()
{
return writer.position() - initialPosition;
}
public ByteBuffer buffer()
{
return buffer != null ? buffer.buffer() : null;
}
public List<IndexInfo> indexSamples()
{
if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) <= DatabaseDescriptor.getColumnIndexCacheSize())
{
return indexSamples;
}
return null;
}
public int[] offsets()
{
return indexOffsets != null
? Arrays.copyOf(indexOffsets, columnIndexCount)
: null;
}
private void addIndexBlock() throws IOException
{
IndexInfo cIndexInfo = new IndexInfo(firstClustering,
lastClustering,
startPosition,
currentPosition() - startPosition,
openMarker);
// indexOffsets is used for both shallow (ShallowIndexedEntry) and non-shallow IndexedEntry.
// For shallow ones, we need it to serialize the offsts in finish().
// For non-shallow ones, the offsts are passed into IndexedEntry, so we don't have to
// calculate the offsets again.
// indexOffsets contains the offsets of the serialized IndexInfo objects.
// I.e. indexOffsets[0] is always 0 so we don't have to deal with a special handling
// for index #0 and always subtracting 1 for the index (which could be error-prone).
if (indexOffsets == null)
indexOffsets = new int[10];
else
{
if (columnIndexCount >= indexOffsets.length)
indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length + 10);
//the 0th element is always 0
if (columnIndexCount == 0)
{
indexOffsets[columnIndexCount] = 0;
}
else
{
indexOffsets[columnIndexCount] =
buffer != null
? Ints.checkedCast(buffer.position())
: indexSamplesSerializedSize;
}
}
columnIndexCount++;
// First, we collect the IndexInfo objects until we reach Config.column_index_cache_size_in_kb in an ArrayList.
// When column_index_cache_size_in_kb is reached, we switch to byte-buffer mode.
if (buffer == null)
{
indexSamplesSerializedSize += idxSerializer.serializedSize(cIndexInfo);
if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) > DatabaseDescriptor.getColumnIndexCacheSize())
{
buffer = reuseOrAllocateBuffer();
for (IndexInfo indexSample : indexSamples)
{
idxSerializer.serialize(indexSample, buffer);
}
}
else
{
indexSamples.add(cIndexInfo);
}
}
// don't put an else here...
if (buffer != null)
{
idxSerializer.serialize(cIndexInfo, buffer);
}
firstClustering = null;
}
private DataOutputBuffer reuseOrAllocateBuffer()
{
// Check whether a reusable DataOutputBuffer already exists for this
// ColumnIndex instance and return it.
if (reusableBuffer != null) {
DataOutputBuffer buffer = reusableBuffer;
buffer.clear();
return buffer;
}
// don't use the standard RECYCLER as that only recycles up to 1MB and requires proper cleanup
return new DataOutputBuffer(DatabaseDescriptor.getColumnIndexCacheSize() * 2);
}
private void add(Unfiltered unfiltered) throws IOException
{
long pos = currentPosition();
if (firstClustering == null)
{
// Beginning of an index block. Remember the start and position
firstClustering = unfiltered.clustering();
startPosition = pos;
}
UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version);
// notify observers about each new row
if (!observers.isEmpty())
observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
lastClustering = unfiltered.clustering();
previousRowStart = pos;
++written;
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null;
}
// if we hit the column index size that we have to index after, go ahead and index it.
if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize())
addIndexBlock();
}
private void finish() throws IOException
{
UnfilteredSerializer.serializer.writeEndOfPartition(writer);
// It's possible we add no rows, just a top level deletion
if (written == 0)
return;
// the last column may have fallen on an index boundary already. if not, index it explicitly.
if (firstClustering != null)
addIndexBlock();
// If we serialize the IndexInfo objects directly in the code above into 'buffer',
// we have to write the offsts to these here. The offsets have already been are collected
// in indexOffsets[]. buffer is != null, if it exceeds Config.column_index_cache_size_in_kb.
// In the other case, when buffer==null, the offsets are serialized in RowIndexEntry.IndexedEntry.serialize().
if (buffer != null)
RowIndexEntry.Serializer.serializeOffsets(buffer, indexOffsets, columnIndexCount);
// we should always have at least one computed index block, but we only write it out if there is more than that.
assert columnIndexCount > 0 && headerLength >= 0;
}
public int indexInfoSerializedSize()
{
return buffer != null
? buffer.buffer().limit()
: indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0);
}
}