blob: ea343fdd0662deb05a05e445b4d5b728a8566e78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format.bti;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.DataComponent;
import org.apache.cassandra.io.sstable.format.IndexComponent;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader.OpenReason;
import org.apache.cassandra.io.sstable.format.SortedTableWriter;
import org.apache.cassandra.io.sstable.format.bti.BtiFormat.Components;
import org.apache.cassandra.io.util.DataPosition;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.MmappedRegionsCache;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Transactional;
import static org.apache.cassandra.io.util.FileHandle.Builder.NO_LENGTH_OVERRIDE;
/**
* Writes SSTables in BTI format (see {@link BtiFormat}), which can be read by {@link BtiTableReader}.
*/
@VisibleForTesting
public class BtiTableWriter extends SortedTableWriter<BtiFormatPartitionWriter>
{
private static final Logger logger = LoggerFactory.getLogger(BtiTableWriter.class);
private final BtiFormatPartitionWriter partitionWriter;
private final IndexWriter iwriter;
public BtiTableWriter(Builder builder, LifecycleNewTracker lifecycleNewTracker, SSTable.Owner owner)
{
super(builder, lifecycleNewTracker, owner);
this.iwriter = builder.getIndexWriter();
this.partitionWriter = builder.getPartitionWriter();
}
@Override
public void mark()
{
super.mark();
iwriter.mark();
}
@Override
public void resetAndTruncate()
{
super.resetAndTruncate();
iwriter.resetAndTruncate();
}
@Override
protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, DeletionTime partitionLevelDeletion, long finishResult) throws IOException
{
TrieIndexEntry entry = TrieIndexEntry.create(partitionWriter.getInitialPosition(),
finishResult,
partitionLevelDeletion,
partitionWriter.getRowIndexBlockCount());
iwriter.append(key, entry);
return entry;
}
@SuppressWarnings({"resource", "RedundantSuppression"})
private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
{
IFilter filter = null;
FileHandle dataFile = null;
PartitionIndex partitionIndex = null;
FileHandle rowIndexFile = null;
BtiTableReader.Builder builder = unbuildTo(new BtiTableReader.Builder(descriptor), true).setMaxDataAge(maxDataAge)
.setSerializationHeader(header)
.setOpenReason(openReason);
try
{
builder.setStatsMetadata(statsMetadata());
partitionIndex = partitionIndexSupplier.get();
rowIndexFile = iwriter.rowIndexFHBuilder.complete();
dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
filter = iwriter.getFilterCopy();
return builder.setPartitionIndex(partitionIndex)
.setFirst(partitionIndex.firstKey())
.setLast(partitionIndex.lastKey())
.setRowIndexFile(rowIndexFile)
.setDataFile(dataFile)
.setFilter(filter)
.build(owner().orElse(null), true, true);
}
catch (RuntimeException | Error ex)
{
JVMStabilityInspector.inspectThrowable(ex);
Throwables.closeNonNullAndAddSuppressed(ex, filter, dataFile, rowIndexFile, partitionIndex);
throw ex;
}
}
@Override
public void openEarly(Consumer<SSTableReader> callWhenReady)
{
long dataLength = dataWriter.position();
iwriter.buildPartial(dataLength, partitionIndex ->
{
iwriter.rowIndexFHBuilder.withLengthOverride(iwriter.rowIndexWriter.getLastFlushOffset());
BtiTableReader reader = openInternal(OpenReason.EARLY, false, () -> partitionIndex);
callWhenReady.accept(reader);
});
}
@Override
public SSTableReader openFinalEarly()
{
// we must ensure the data is completely flushed to disk
iwriter.complete(); // This will be called by completedPartitionIndex() below too, but we want it done now to
// ensure outstanding openEarly actions are not triggered.
dataWriter.sync();
iwriter.rowIndexWriter.sync();
// Note: Nothing must be written to any of the files after this point, as the chunk cache could pick up and
// retain a partially-written page.
return openFinal(OpenReason.EARLY);
}
@Override
@SuppressWarnings({"resource", "RedundantSuppression"})
protected SSTableReader openFinal(OpenReason openReason)
{
if (maxDataAge < 0)
maxDataAge = Clock.Global.currentTimeMillis();
return openInternal(openReason, true, iwriter::completedPartitionIndex);
}
@Override
protected TransactionalProxy txnProxy()
{
return new TransactionalProxy(() -> FBUtilities.immutableListWithFilteredNulls(iwriter, dataWriter));
}
private class TransactionalProxy extends SortedTableWriter<BtiFormatPartitionWriter>.TransactionalProxy
{
public TransactionalProxy(Supplier<ImmutableList<Transactional>> transactionals)
{
super(transactionals);
}
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
accumulate = Throwables.close(accumulate, partitionWriter);
accumulate = super.doPostCleanup(accumulate);
return accumulate;
}
}
/**
* Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
*/
static class IndexWriter extends SortedTableWriter.AbstractIndexWriter
{
final SequentialWriter rowIndexWriter;
private final FileHandle.Builder rowIndexFHBuilder;
private final SequentialWriter partitionIndexWriter;
private final FileHandle.Builder partitionIndexFHBuilder;
private final PartitionIndexBuilder partitionIndex;
boolean partitionIndexCompleted = false;
private DataPosition riMark;
private DataPosition piMark;
IndexWriter(Builder b)
{
super(b);
rowIndexWriter = new SequentialWriter(descriptor.fileFor(Components.ROW_INDEX), b.getIOOptions().writerOptions);
rowIndexFHBuilder = IndexComponent.fileBuilder(Components.ROW_INDEX, b).withMmappedRegionsCache(b.getMmappedRegionsCache());
partitionIndexWriter = new SequentialWriter(descriptor.fileFor(Components.PARTITION_INDEX), b.getIOOptions().writerOptions);
partitionIndexFHBuilder = IndexComponent.fileBuilder(Components.PARTITION_INDEX, b).withMmappedRegionsCache(b.getMmappedRegionsCache());
partitionIndex = new PartitionIndexBuilder(partitionIndexWriter, partitionIndexFHBuilder);
// register listeners to be alerted when the data files are flushed
partitionIndexWriter.setPostFlushListener(() -> partitionIndex.markPartitionIndexSynced(partitionIndexWriter.getLastFlushOffset()));
rowIndexWriter.setPostFlushListener(() -> partitionIndex.markRowIndexSynced(rowIndexWriter.getLastFlushOffset()));
@SuppressWarnings({"resource", "RedundantSuppression"})
SequentialWriter dataWriter = b.getDataWriter();
dataWriter.setPostFlushListener(() -> partitionIndex.markDataSynced(dataWriter.getLastFlushOffset()));
}
public long append(DecoratedKey key, AbstractRowIndexEntry indexEntry) throws IOException
{
bf.add(key);
long position;
if (indexEntry.isIndexed())
{
long indexStart = rowIndexWriter.position();
try
{
ByteBufferUtil.writeWithShortLength(key.getKey(), rowIndexWriter);
((TrieIndexEntry) indexEntry).serialize(rowIndexWriter, rowIndexWriter.position());
}
catch (IOException e)
{
throw new FSWriteError(e, rowIndexWriter.getFile());
}
if (logger.isTraceEnabled())
logger.trace("wrote index entry: {} at {}", indexEntry, indexStart);
position = indexStart;
}
else
{
// Write data position directly in trie.
position = ~indexEntry.position;
}
partitionIndex.addEntry(key, position);
return position;
}
public boolean buildPartial(long dataPosition, Consumer<PartitionIndex> callWhenReady)
{
return partitionIndex.buildPartial(callWhenReady, rowIndexWriter.position(), dataPosition);
}
public void mark()
{
riMark = rowIndexWriter.mark();
piMark = partitionIndexWriter.mark();
}
public void resetAndTruncate()
{
// we can't un-set the bloom filter addition, but extra keys in there are harmless.
// we can't reset dbuilder either, but that is the last thing called in after append, so
// we assume that if that worked then we won't be trying to reset.
rowIndexWriter.resetAndTruncate(riMark);
partitionIndexWriter.resetAndTruncate(piMark);
}
protected void doPrepare()
{
flushBf();
// truncate index file
rowIndexWriter.prepareToCommit();
rowIndexFHBuilder.withLengthOverride(rowIndexWriter.getLastFlushOffset());
complete();
}
void complete() throws FSWriteError
{
if (partitionIndexCompleted)
return;
try
{
partitionIndex.complete();
partitionIndexCompleted = true;
}
catch (IOException e)
{
throw new FSWriteError(e, partitionIndexWriter.getFile());
}
}
PartitionIndex completedPartitionIndex()
{
complete();
rowIndexFHBuilder.withLengthOverride(0);
partitionIndexFHBuilder.withLengthOverride(0);
try
{
return PartitionIndex.load(partitionIndexFHBuilder, metadata.getLocal().partitioner, false);
}
catch (IOException e)
{
throw new FSReadError(e, partitionIndexWriter.getFile());
}
}
protected Throwable doCommit(Throwable accumulate)
{
return rowIndexWriter.commit(accumulate);
}
protected Throwable doAbort(Throwable accumulate)
{
return rowIndexWriter.abort(accumulate);
}
@Override
protected Throwable doPostCleanup(Throwable accumulate)
{
return Throwables.close(accumulate, bf, partitionIndex, rowIndexWriter, partitionIndexWriter);
}
}
public static class Builder extends SortedTableWriter.Builder<BtiFormatPartitionWriter, BtiTableWriter, Builder>
{
private SequentialWriter dataWriter;
private BtiFormatPartitionWriter partitionWriter;
private IndexWriter indexWriter;
private MmappedRegionsCache mmappedRegionsCache;
public Builder(Descriptor descriptor)
{
super(descriptor);
}
// The following getters for the resources opened by buildInternal method can be only used during the lifetime of
// that method - that is, during the construction of the sstable.
@Override
public MmappedRegionsCache getMmappedRegionsCache()
{
return ensuringInBuildInternalContext(mmappedRegionsCache);
}
@Override
public SequentialWriter getDataWriter()
{
return ensuringInBuildInternalContext(dataWriter);
}
@Override
public BtiFormatPartitionWriter getPartitionWriter()
{
return ensuringInBuildInternalContext(partitionWriter);
}
public IndexWriter getIndexWriter()
{
return ensuringInBuildInternalContext(indexWriter);
}
private <T> T ensuringInBuildInternalContext(T value)
{
Preconditions.checkState(value != null, "This getter can be used only during the lifetime of the sstable constructor. Do not use it directly.");
return value;
}
@Override
public Builder addDefaultComponents()
{
super.addDefaultComponents();
addComponents(ImmutableSet.of(Components.PARTITION_INDEX, Components.ROW_INDEX));
return this;
}
@Override
protected BtiTableWriter buildInternal(LifecycleNewTracker lifecycleNewTracker, Owner owner)
{
try
{
mmappedRegionsCache = new MmappedRegionsCache();
dataWriter = DataComponent.buildWriter(descriptor,
getTableMetadataRef().getLocal(),
getIOOptions().writerOptions,
getMetadataCollector(),
lifecycleNewTracker.opType(),
getIOOptions().flushCompression);
indexWriter = new IndexWriter(this);
partitionWriter = new BtiFormatPartitionWriter(getSerializationHeader(),
getTableMetadataRef().getLocal().comparator,
dataWriter,
indexWriter.rowIndexWriter,
descriptor.version);
return new BtiTableWriter(this, lifecycleNewTracker, owner);
}
catch (RuntimeException | Error ex)
{
Throwables.closeAndAddSuppressed(ex, partitionWriter, indexWriter, dataWriter, mmappedRegionsCache);
throw ex;
}
finally
{
partitionWriter = null;
indexWriter = null;
dataWriter = null;
mmappedRegionsCache = null;
}
}
}
}