blob: 9af43cbdd59b6471c8bf222c0aced370114464da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format.bti;
import java.io.IOException;
import java.util.function.Consumer;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.tries.IncrementalTrieWriter;
import org.apache.cassandra.io.tries.Walker;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
/**
* Partition index builder: stores index or data positions in an incrementally built, page aware on-disk trie.
* <p>
* The files created by this builder are read by {@link PartitionIndex}.
*/
class PartitionIndexBuilder implements AutoCloseable
{
private final SequentialWriter writer;
private final IncrementalTrieWriter<PartitionIndex.Payload> trieWriter;
private final FileHandle.Builder fhBuilder;
// the last synced data file position
private long dataSyncPosition;
// the last synced row index file position
private long rowIndexSyncPosition;
// the last synced partition index file position
private long partitionIndexSyncPosition;
// Partial index can only be used after all three files have been synced to the required positions.
private long partialIndexDataEnd;
private long partialIndexRowEnd;
private long partialIndexPartitionEnd;
private IncrementalTrieWriter.PartialTail partialIndexTail;
private Consumer<PartitionIndex> partialIndexConsumer;
private DecoratedKey partialIndexLastKey;
private int lastDiffPoint;
private DecoratedKey firstKey;
private DecoratedKey lastKey;
private DecoratedKey lastWrittenKey;
private PartitionIndex.Payload lastPayload;
public PartitionIndexBuilder(SequentialWriter writer, FileHandle.Builder fhBuilder)
{
this.writer = writer;
this.trieWriter = IncrementalTrieWriter.open(PartitionIndex.TRIE_SERIALIZER, writer);
this.fhBuilder = fhBuilder;
}
/*
* Called when partition index has been flushed to the given position.
* If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer.
*/
public void markPartitionIndexSynced(long upToPosition)
{
partitionIndexSyncPosition = upToPosition;
refreshReadableBoundary();
}
/*
* Called when row index has been flushed to the given position.
* If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer.
*/
public void markRowIndexSynced(long upToPosition)
{
rowIndexSyncPosition = upToPosition;
refreshReadableBoundary();
}
/*
* Called when data file has been flushed to the given position.
* If this makes all required positions for a partial view flushed, this will call the partialIndexConsumer.
*/
public void markDataSynced(long upToPosition)
{
dataSyncPosition = upToPosition;
refreshReadableBoundary();
}
private void refreshReadableBoundary()
{
if (partialIndexConsumer == null)
return;
if (dataSyncPosition < partialIndexDataEnd)
return;
if (rowIndexSyncPosition < partialIndexRowEnd)
return;
if (partitionIndexSyncPosition < partialIndexPartitionEnd)
return;
try (FileHandle fh = fhBuilder.withLengthOverride(writer.getLastFlushOffset()).complete())
{
@SuppressWarnings({ "resource", "RedundantSuppression" })
PartitionIndex pi = new PartitionIndexEarly(fh, partialIndexTail.root(), partialIndexTail.count(), firstKey, partialIndexLastKey, partialIndexTail.cutoff(), partialIndexTail.tail());
partialIndexConsumer.accept(pi);
partialIndexConsumer = null;
}
finally
{
fhBuilder.withLengthOverride(-1);
}
}
/**
* @param decoratedKey the key for this record
* @param position the position to write with the record:
* - positive if position points to an index entry in the index file
* - negative if ~position points directly to the key in the data file
*/
public void addEntry(DecoratedKey decoratedKey, long position) throws IOException
{
if (lastKey == null)
{
firstKey = decoratedKey;
lastDiffPoint = 0;
}
else
{
int diffPoint = ByteComparable.diffPoint(lastKey, decoratedKey, Walker.BYTE_COMPARABLE_VERSION);
ByteComparable prevPrefix = ByteComparable.cut(lastKey, Math.max(diffPoint, lastDiffPoint));
trieWriter.add(prevPrefix, lastPayload);
lastWrittenKey = lastKey;
lastDiffPoint = diffPoint;
}
lastKey = decoratedKey;
lastPayload = new PartitionIndex.Payload(position, decoratedKey.filterHashLowerBits());
}
public long complete() throws IOException
{
// Do not trigger pending partial builds.
partialIndexConsumer = null;
if (lastKey != lastWrittenKey)
{
ByteComparable prevPrefix = ByteComparable.cut(lastKey, lastDiffPoint);
trieWriter.add(prevPrefix, lastPayload);
}
long root = trieWriter.complete();
long count = trieWriter.count();
long firstKeyPos = writer.position();
if (firstKey != null)
{
ByteBufferUtil.writeWithShortLength(firstKey.getKey(), writer);
ByteBufferUtil.writeWithShortLength(lastKey.getKey(), writer);
}
else
{
assert lastKey == null;
writer.writeShort(0);
writer.writeShort(0);
}
writer.writeLong(firstKeyPos);
writer.writeLong(count);
writer.writeLong(root);
writer.sync();
fhBuilder.withLengthOverride(writer.getLastFlushOffset());
return root;
}
/**
* Builds a PartitionIndex representing the records written until this point without interrupting writes. Because
* data in buffered writers does not get immediately flushed to the file system, and we do not want to force flushing
* of the relevant files (which e.g. could cause a problem for compressed data files), this call cannot return
* immediately. Instead, it will take an index snapshot but wait with making it active (by calling the provided
* callback) until it registers that all relevant files (data, row index and partition index) have been flushed at
* least as far as the required positions.
*
* @param callWhenReady callback that is given the prepared partial index when all relevant data has been flushed
* @param rowIndexEnd the position in the row index file we need to be able to read to (exclusive) to read all
* records written so far
* @param dataEnd the position in the data file we need to be able to read to (exclusive) to read all records
* written so far
* @return true if the request was accepted, false if there's no point to do this at this time (e.g. another
* partial representation is prepared but still isn't usable).
*/
public boolean buildPartial(Consumer<PartitionIndex> callWhenReady, long rowIndexEnd, long dataEnd)
{
// If we haven't advanced since the last time we prepared, there's nothing to do.
if (lastWrittenKey == partialIndexLastKey)
return false;
// Don't waste time if an index was already prepared but hasn't reached usability yet.
if (partialIndexConsumer != null)
return false;
try
{
partialIndexTail = trieWriter.makePartialRoot();
partialIndexDataEnd = dataEnd;
partialIndexRowEnd = rowIndexEnd;
partialIndexPartitionEnd = writer.position();
partialIndexLastKey = lastWrittenKey;
partialIndexConsumer = callWhenReady;
return true;
}
catch (IOException e)
{
// As writes happen on in-memory buffers, failure here is not expected.
throw new AssertionError(e);
}
}
// close the builder and release any associated memory
public void close()
{
trieWriter.close();
}
}