blob: a3afc2f93197e6183b113b00ee545ad58d4282da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.util;
import java.util.Objects;
import java.util.Optional;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
import static org.apache.cassandra.utils.Throwables.maybeFail;
/**
* {@link FileHandle} provides access to a file for reading, including the ones written by various {@link SequentialWriter}
* instances, and it is typically used by {@link org.apache.cassandra.io.sstable.format.SSTableReader}.
*
* Use {@link FileHandle.Builder} to create an instance, and call {@link #createReader()} (and its variants) to
* access the readers for the underlying file.
*
* You can use {@link Builder#complete()} several times during its lifecycle with different {@code overrideLength}(i.e. early opening file).
* For that reason, the builder keeps a reference to the file channel and makes a copy for each {@link Builder#complete()} call.
* Therefore, it is important to close the {@link Builder} when it is no longer needed, as well as any {@link FileHandle}
* instances.
*/
public class FileHandle extends SharedCloseableImpl
{
private static final Logger logger = LoggerFactory.getLogger(FileHandle.class);
public final ChannelProxy channel;
public final long onDiskLength;
/*
* Rebufferer factory to use when constructing RandomAccessReaders
*/
private final RebuffererFactory rebuffererFactory;
/*
* Optional CompressionMetadata when dealing with compressed file
*/
private final Optional<CompressionMetadata> compressionMetadata;
private FileHandle(Cleanup cleanup,
ChannelProxy channel,
RebuffererFactory rebuffererFactory,
CompressionMetadata compressionMetadata,
long onDiskLength)
{
super(cleanup);
this.rebuffererFactory = rebuffererFactory;
this.channel = channel;
this.compressionMetadata = Optional.ofNullable(compressionMetadata);
this.onDiskLength = onDiskLength;
}
private FileHandle(FileHandle copy)
{
super(copy);
channel = copy.channel;
rebuffererFactory = copy.rebuffererFactory;
compressionMetadata = copy.compressionMetadata;
onDiskLength = copy.onDiskLength;
}
/**
* @return Path to the file this factory is referencing
*/
public String path()
{
return channel.filePath();
}
public long dataLength()
{
return compressionMetadata.map(c -> c.dataLength).orElseGet(rebuffererFactory::fileLength);
}
public RebuffererFactory rebuffererFactory()
{
return rebuffererFactory;
}
public Optional<CompressionMetadata> compressionMetadata()
{
return compressionMetadata;
}
@Override
public void addTo(Ref.IdentityCollection identities)
{
super.addTo(identities);
compressionMetadata.ifPresent(metadata -> metadata.addTo(identities));
}
@Override
public FileHandle sharedCopy()
{
return new FileHandle(this);
}
/**
* Create {@link RandomAccessReader} with configured method of reading content of the file.
*
* @return RandomAccessReader for the file
*/
public RandomAccessReader createReader()
{
return createReader(null);
}
/**
* Create {@link RandomAccessReader} with configured method of reading content of the file.
* Reading from file will be rate limited by given {@link RateLimiter}.
*
* @param limiter RateLimiter to use for rate limiting read
* @return RandomAccessReader for the file
*/
public RandomAccessReader createReader(RateLimiter limiter)
{
return new RandomAccessReader(instantiateRebufferer(limiter));
}
public FileDataInput createReader(long position)
{
RandomAccessReader reader = createReader();
reader.seek(position);
return reader;
}
/**
* Drop page cache from start to given {@code before}.
*
* @param before uncompressed position from start of the file to be dropped from cache. if 0, to end of file.
*/
public void dropPageCache(long before)
{
long position = compressionMetadata.map(metadata -> {
if (before >= metadata.dataLength)
return 0L;
else
return metadata.chunkFor(before).offset;
}).orElse(before);
NativeLibrary.trySkipCache(channel.getFileDescriptor(), 0, position, path());
}
private Rebufferer instantiateRebufferer(RateLimiter limiter)
{
Rebufferer rebufferer = rebuffererFactory.instantiateRebufferer();
if (limiter != null)
rebufferer = new LimitingRebufferer(rebufferer, limiter, DiskOptimizationStrategy.MAX_BUFFER_SIZE);
return rebufferer;
}
/**
* Perform clean up of all resources held by {@link FileHandle}.
*/
private static class Cleanup implements RefCounted.Tidy
{
final ChannelProxy channel;
final RebuffererFactory rebufferer;
final CompressionMetadata compressionMetadata;
final Optional<ChunkCache> chunkCache;
private Cleanup(ChannelProxy channel,
RebuffererFactory rebufferer,
CompressionMetadata compressionMetadata,
ChunkCache chunkCache)
{
this.channel = channel;
this.rebufferer = rebufferer;
this.compressionMetadata = compressionMetadata;
this.chunkCache = Optional.ofNullable(chunkCache);
}
public String name()
{
return channel.filePath();
}
public void tidy()
{
chunkCache.ifPresent(cache -> cache.invalidateFile(name()));
try
{
if (compressionMetadata != null)
{
compressionMetadata.close();
}
}
finally
{
try
{
channel.close();
}
finally
{
rebufferer.close();
}
}
}
}
/**
* Configures how the file will be read (compressed, mmapped, use cache etc.)
*/
public static class Builder implements AutoCloseable
{
private final String path;
private ChannelProxy channel;
private CompressionMetadata compressionMetadata;
private MmappedRegions regions;
private ChunkCache chunkCache;
private int bufferSize = RandomAccessReader.DEFAULT_BUFFER_SIZE;
private BufferType bufferType = BufferType.OFF_HEAP;
private boolean mmapped = false;
private boolean compressed = false;
public Builder(String path)
{
this.path = path;
}
public Builder(ChannelProxy channel)
{
this.channel = channel;
this.path = channel.filePath();
}
public Builder compressed(boolean compressed)
{
this.compressed = compressed;
return this;
}
/**
* Set {@link ChunkCache} to use.
*
* @param chunkCache ChunkCache object to use for caching
* @return this object
*/
public Builder withChunkCache(ChunkCache chunkCache)
{
this.chunkCache = chunkCache;
return this;
}
/**
* Provide {@link CompressionMetadata} to use when reading compressed file.
*
* @param metadata CompressionMetadata to use
* @return this object
*/
public Builder withCompressionMetadata(CompressionMetadata metadata)
{
this.compressed = Objects.nonNull(metadata);
this.compressionMetadata = metadata;
return this;
}
/**
* Set whether to use mmap for reading
*
* @param mmapped true if using mmap
* @return this instance
*/
public Builder mmapped(boolean mmapped)
{
this.mmapped = mmapped;
return this;
}
/**
* Set the buffer size to use (if appropriate).
*
* @param bufferSize Buffer size in bytes
* @return this instance
*/
public Builder bufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
return this;
}
/**
* Set the buffer type (on heap or off heap) to use (if appropriate).
*
* @param bufferType Buffer type to use
* @return this instance
*/
public Builder bufferType(BufferType bufferType)
{
this.bufferType = bufferType;
return this;
}
/**
* Complete building {@link FileHandle} without overriding file length.
*
* @see #complete(long)
*/
public FileHandle complete()
{
return complete(-1L);
}
/**
* Complete building {@link FileHandle} with the given length, which overrides the file length.
*
* @param overrideLength Override file length (in bytes) so that read cannot go further than this value.
* If the value is less than or equal to 0, then the value is ignored.
* @return Built file
*/
@SuppressWarnings("resource")
public FileHandle complete(long overrideLength)
{
if (channel == null)
{
channel = new ChannelProxy(path);
}
ChannelProxy channelCopy = channel.sharedCopy();
try
{
if (compressed && compressionMetadata == null)
compressionMetadata = CompressionMetadata.create(channelCopy.filePath());
long length = overrideLength > 0 ? overrideLength : compressed ? compressionMetadata.compressedFileLength : channelCopy.size();
RebuffererFactory rebuffererFactory;
if (mmapped)
{
if (compressed)
{
regions = MmappedRegions.map(channelCopy, compressionMetadata);
rebuffererFactory = maybeCached(new CompressedChunkReader.Mmap(channelCopy, compressionMetadata,
regions));
}
else
{
updateRegions(channelCopy, length);
rebuffererFactory = new MmapRebufferer(channelCopy, length, regions.sharedCopy());
}
}
else
{
regions = null;
if (compressed)
{
rebuffererFactory = maybeCached(new CompressedChunkReader.Standard(channelCopy, compressionMetadata));
}
else
{
int chunkSize = DiskOptimizationStrategy.roundForCaching(bufferSize, ChunkCache.roundUp);
rebuffererFactory = maybeCached(new SimpleChunkReader(channelCopy, length, bufferType, chunkSize));
}
}
Cleanup cleanup = new Cleanup(channelCopy, rebuffererFactory, compressionMetadata, chunkCache);
return new FileHandle(cleanup, channelCopy, rebuffererFactory, compressionMetadata, length);
}
catch (Throwable t)
{
channelCopy.close();
throw t;
}
}
public Throwable close(Throwable accumulate)
{
if (!compressed && regions != null)
accumulate = regions.close(accumulate);
if (channel != null)
return channel.close(accumulate);
return accumulate;
}
public void close()
{
maybeFail(close(null));
}
private RebuffererFactory maybeCached(ChunkReader reader)
{
if (chunkCache != null && chunkCache.capacity() > 0)
return chunkCache.wrap(reader);
return reader;
}
private void updateRegions(ChannelProxy channel, long length)
{
if (regions != null && !regions.isValid(channel))
{
Throwable err = regions.close(null);
if (err != null)
logger.error("Failed to close mapped regions", err);
regions = null;
}
if (regions == null)
regions = MmappedRegions.map(channel, length);
else
regions.extend(length);
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "(path='" + path() + '\'' +
", length=" + rebuffererFactory.fileLength() +
')';
}
}