blob: 801982d5ec5988e1e7cf1d750931444e7e5e7b32 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format.big;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.SortedTablePartitionWriter;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.SequentialWriter;
/**
* Column index builder used by {@link org.apache.cassandra.io.sstable.format.big.BigTableWriter}.
* For index entries that exceed {@link org.apache.cassandra.config.Config#column_index_cache_size},
* this uses the serialization logic as in {@link RowIndexEntry}.
*/
public class BigFormatPartitionWriter extends SortedTablePartitionWriter
{
@VisibleForTesting
public static final int DEFAULT_GRANULARITY = 64 * 1024;
// used, if the row-index-entry reaches config column_index_cache_size
private DataOutputBuffer buffer;
// used to track the size of the serialized size of row-index-entry (unused for buffer)
private int indexSamplesSerializedSize;
// used, until the row-index-entry reaches config column_index_cache_size
private final List<IndexInfo> indexSamples = new ArrayList<>();
private DataOutputBuffer reusableBuffer;
private int columnIndexCount;
private int[] indexOffsets;
private final ISerializer<IndexInfo> idxSerializer;
private final int cacheSizeThreshold;
private final int indexSize;
BigFormatPartitionWriter(SerializationHeader header,
SequentialWriter writer,
Version version,
ISerializer<IndexInfo> indexInfoSerializer)
{
this(header, writer, version, indexInfoSerializer, DatabaseDescriptor.getColumnIndexCacheSize(), DatabaseDescriptor.getColumnIndexSize(DEFAULT_GRANULARITY));
}
BigFormatPartitionWriter(SerializationHeader header,
SequentialWriter writer,
Version version,
ISerializer<IndexInfo> indexInfoSerializer,
int cacheSizeThreshold,
int indexSize)
{
super(header, writer, version);
this.idxSerializer = indexInfoSerializer;
this.cacheSizeThreshold = cacheSizeThreshold;
this.indexSize = indexSize;
}
public void reset()
{
super.reset();
this.columnIndexCount = 0;
this.indexSamplesSerializedSize = 0;
this.indexSamples.clear();
if (this.buffer != null)
this.reusableBuffer = this.buffer;
this.buffer = null;
}
public int getColumnIndexCount()
{
return columnIndexCount;
}
public ByteBuffer buffer()
{
return buffer != null ? buffer.buffer() : null;
}
public List<IndexInfo> indexSamples()
{
if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) <= cacheSizeThreshold)
{
return indexSamples;
}
return null;
}
public int[] offsets()
{
return indexOffsets != null
? Arrays.copyOf(indexOffsets, columnIndexCount)
: null;
}
private void addIndexBlock() throws IOException
{
IndexInfo cIndexInfo = new IndexInfo(firstClustering,
lastClustering,
startPosition,
currentPosition() - startPosition,
!openMarker.isLive() ? openMarker : null);
// indexOffsets is used for both shallow (ShallowIndexedEntry) and non-shallow IndexedEntry.
// For shallow ones, we need it to serialize the offsts in finish().
// For non-shallow ones, the offsts are passed into IndexedEntry, so we don't have to
// calculate the offsets again.
// indexOffsets contains the offsets of the serialized IndexInfo objects.
// I.e. indexOffsets[0] is always 0 so we don't have to deal with a special handling
// for index #0 and always subtracting 1 for the index (which could be error-prone).
if (indexOffsets == null)
indexOffsets = new int[10];
else
{
if (columnIndexCount >= indexOffsets.length)
indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length + 10);
//the 0th element is always 0
if (columnIndexCount == 0)
{
indexOffsets[columnIndexCount] = 0;
}
else
{
indexOffsets[columnIndexCount] =
buffer != null
? Ints.checkedCast(buffer.position())
: indexSamplesSerializedSize;
}
}
columnIndexCount++;
// First, we collect the IndexInfo objects until we reach Config.column_index_cache_size in an ArrayList.
// When column_index_cache_size is reached, we switch to byte-buffer mode.
if (buffer == null)
{
indexSamplesSerializedSize += idxSerializer.serializedSize(cIndexInfo);
if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) > cacheSizeThreshold)
{
buffer = reuseOrAllocateBuffer();
for (IndexInfo indexSample : indexSamples)
{
idxSerializer.serialize(indexSample, buffer);
}
}
else
{
indexSamples.add(cIndexInfo);
}
}
// don't put an else here since buffer may be allocated in preceding if block
if (buffer != null)
{
idxSerializer.serialize(cIndexInfo, buffer);
}
firstClustering = null;
}
private DataOutputBuffer reuseOrAllocateBuffer()
{
// Check whether a reusable DataOutputBuffer already exists for this
// ColumnIndex instance and return it.
if (reusableBuffer != null)
{
DataOutputBuffer buffer = reusableBuffer;
buffer.clear();
return buffer;
}
// don't use the standard RECYCLER as that only recycles up to 1MB and requires proper cleanup
return new DataOutputBuffer(cacheSizeThreshold * 2);
}
@Override
public void addUnfiltered(Unfiltered unfiltered) throws IOException
{
super.addUnfiltered(unfiltered);
// if we hit the column index size that we have to index after, go ahead and index it.
if (currentPosition() - startPosition >= indexSize)
addIndexBlock();
}
@Override
public long finish() throws IOException
{
long endPosition = super.finish();
// It's possible we add no rows, just a top level deletion
if (written == 0)
return endPosition;
// the last column may have fallen on an index boundary already. if not, index it explicitly.
if (firstClustering != null)
addIndexBlock();
// If we serialize the IndexInfo objects directly in the code above into 'buffer',
// we have to write the offsts to these here. The offsets have already been collected
// in indexOffsets[]. buffer is != null, if it exceeds Config.column_index_cache_size.
// In the other case, when buffer==null, the offsets are serialized in RowIndexEntry.IndexedEntry.serialize().
if (buffer != null)
{
for (int i = 0; i < columnIndexCount; i++)
buffer.writeInt(indexOffsets[i]);
}
// we should always have at least one computed index block, but we only write it out if there is more than that.
assert columnIndexCount > 0 && getHeaderLength() >= 0;
return endPosition;
}
public int indexInfoSerializedSize()
{
return buffer != null
? buffer.buffer().limit()
: indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0);
}
@Override
public void close()
{
// no-op
}
}