blob: 0afd071246415489481f0f0ffeb8b30fd7feb7cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.s3a.prefetch;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManager;
import org.apache.hadoop.fs.impl.prefetch.BufferData;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
/**
* Provides an {@code InputStream} that allows reading from an S3 file.
* Prefetched blocks are cached to local disk if a seek away from the
* current block is issued.
*/
public class S3ACachingInputStream extends S3ARemoteInputStream {
private static final Logger LOG = LoggerFactory.getLogger(
S3ACachingInputStream.class);
/**
* Number of blocks queued for prefching.
*/
private final int numBlocksToPrefetch;
private final BlockManager blockManager;
/**
* Initializes a new instance of the {@code S3ACachingInputStream} class.
*
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
* @throws IllegalArgumentException if client is null.
*/
public S3ACachingInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);
this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
int bufferPoolSize = this.numBlocksToPrefetch + 1;
this.blockManager = this.createBlockManager(
this.getContext().getFuturePool(),
this.getReader(),
this.getBlockData(),
bufferPoolSize);
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
fileSize);
}
/**
* Moves the current read position so that the next read will occur at {@code pos}.
*
* @param pos the next read will take place at this position.
*
* @throws IllegalArgumentException if pos is outside of the range [0, file size].
*/
@Override
public void seek(long pos) throws IOException {
throwIfClosed();
throwIfInvalidSeek(pos);
// The call to setAbsolute() returns true if the target position is valid and
// within the current block. Therefore, no additional work is needed when we get back true.
if (!getFilePosition().setAbsolute(pos)) {
LOG.info("seek({})", getOffsetStr(pos));
// We could be here in two cases:
// -- the target position is invalid:
// We ignore this case here as the next read will return an error.
// -- it is valid but outside of the current block.
if (getFilePosition().isValid()) {
// There are two cases to consider:
// -- the seek was issued after this buffer was fully read.
// In this case, it is very unlikely that this buffer will be needed again;
// therefore we release the buffer without caching.
// -- if we are jumping out of the buffer before reading it completely then
// we will likely need this buffer again (as observed empirically for Parquet)
// therefore we issue an async request to cache this buffer.
if (!getFilePosition().bufferFullyRead()) {
blockManager.requestCaching(getFilePosition().data());
} else {
blockManager.release(getFilePosition().data());
}
getFilePosition().invalidate();
blockManager.cancelPrefetches();
}
setSeekTargetPos(pos);
}
}
@Override
public void close() throws IOException {
// Close the BlockManager first, cancelling active prefetches,
// deleting cached files and freeing memory used by buffer pool.
blockManager.close();
super.close();
LOG.info("closed: {}", getName());
}
@Override
protected boolean ensureCurrentBuffer() throws IOException {
if (isClosed()) {
return false;
}
if (getFilePosition().isValid() && getFilePosition()
.buffer()
.hasRemaining()) {
return true;
}
long readPos;
int prefetchCount;
if (getFilePosition().isValid()) {
// A sequential read results in a prefetch.
readPos = getFilePosition().absolute();
prefetchCount = numBlocksToPrefetch;
} else {
// A seek invalidates the current position.
// We prefetch only 1 block immediately after a seek operation.
readPos = getSeekTargetPos();
prefetchCount = 1;
}
if (!getBlockData().isValidOffset(readPos)) {
return false;
}
if (getFilePosition().isValid()) {
if (getFilePosition().bufferFullyRead()) {
blockManager.release(getFilePosition().data());
} else {
blockManager.requestCaching(getFilePosition().data());
}
}
int toBlockNumber = getBlockData().getBlockNumber(readPos);
long startOffset = getBlockData().getStartOffset(toBlockNumber);
for (int i = 1; i <= prefetchCount; i++) {
int b = toBlockNumber + i;
if (b < getBlockData().getNumBlocks()) {
blockManager.requestPrefetch(b);
}
}
BufferData data = invokeTrackingDuration(
getS3AStreamStatistics()
.trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ),
() -> blockManager.get(toBlockNumber));
getFilePosition().setData(data, startOffset, readPos);
return true;
}
@Override
public String toString() {
if (isClosed()) {
return "closed";
}
StringBuilder sb = new StringBuilder();
sb.append(String.format("fpos = (%s)%n", getFilePosition()));
sb.append(blockManager.toString());
return sb.toString();
}
protected BlockManager createBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize) {
return new S3ACachingBlockManager(futurePool, reader, blockData,
bufferPoolSize,
getS3AStreamStatistics());
}
}