blob: 76ef942ed655a882dba0cf9fd0a7484754402a84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.s3a.prefetch;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/**
* Enhanced {@code InputStream} for reading from S3.
*
* This implementation provides improved read throughput by asynchronously prefetching
* blocks of configurable size from the underlying S3 file.
*/
public class S3APrefetchingInputStream
extends FSInputStream
implements CanSetReadahead, StreamCapabilities, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(
S3APrefetchingInputStream.class);
/**
* Underlying input stream used for reading S3 file.
*/
private S3ARemoteInputStream inputStream;
/**
* Initializes a new instance of the {@code S3APrefetchingInputStream} class.
*
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
* @throws IllegalArgumentException if client is null.
*/
public S3APrefetchingInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
Validate.checkNotNull(context, "context");
Validate.checkNotNull(s3Attributes, "s3Attributes");
Validate.checkNotNullAndNotEmpty(s3Attributes.getBucket(),
"s3Attributes.getBucket()");
Validate.checkNotNullAndNotEmpty(s3Attributes.getKey(),
"s3Attributes.getKey()");
Validate.checkNotNegative(s3Attributes.getLen(), "s3Attributes.getLen()");
Validate.checkNotNull(client, "client");
Validate.checkNotNull(streamStatistics, "streamStatistics");
long fileSize = s3Attributes.getLen();
if (fileSize <= context.getPrefetchBlockSize()) {
LOG.debug("Creating in memory input stream for {}", context.getPath());
this.inputStream = new S3AInMemoryInputStream(
context,
s3Attributes,
client,
streamStatistics);
} else {
LOG.debug("Creating in caching input stream for {}", context.getPath());
this.inputStream = new S3ACachingInputStream(
context,
s3Attributes,
client,
streamStatistics);
}
}
/**
* Returns the number of bytes available for reading without blocking.
*
* @return the number of bytes available for reading without blocking.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized int available() throws IOException {
throwIfClosed();
return inputStream.available();
}
/**
* Gets the current position.
*
* @return the current position.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized long getPos() throws IOException {
return isClosed() ? 0 : inputStream.getPos();
}
/**
* Reads and returns one byte from this stream.
*
* @return the next byte from this stream.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized int read() throws IOException {
throwIfClosed();
return inputStream.read();
}
/**
* Reads up to {@code len} bytes from this stream and copies them into
* the given {@code buffer} starting at the given {@code offset}.
* Returns the number of bytes actually copied in to the given buffer.
*
* @param buffer the buffer to copy data into.
* @param offset data is copied starting at this offset.
* @param len max number of bytes to copy.
* @return the number of bytes actually copied in to the given buffer.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized int read(byte[] buffer, int offset, int len)
throws IOException {
throwIfClosed();
return inputStream.read(buffer, offset, len);
}
/**
* Closes this stream and releases all acquired resources.
*
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized void close() throws IOException {
if (inputStream != null) {
inputStream.close();
inputStream = null;
super.close();
}
}
/**
* Updates internal data such that the next read will take place at the given {@code pos}.
*
* @param pos new read position.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized void seek(long pos) throws IOException {
throwIfClosed();
inputStream.seek(pos);
}
/**
* Sets the number of bytes to read ahead each time.
*
* @param readahead the number of bytes to read ahead each time..
*/
@Override
public synchronized void setReadahead(Long readahead) {
if (!isClosed()) {
inputStream.setReadahead(readahead);
}
}
/**
* Indicates whether the given {@code capability} is supported by this stream.
*
* @param capability the capability to check.
* @return true if the given {@code capability} is supported by this stream, false otherwise.
*/
@Override
public boolean hasCapability(String capability) {
if (!isClosed()) {
return inputStream.hasCapability(capability);
}
return false;
}
/**
* Access the input stream statistics.
* This is for internal testing and may be removed without warning.
* @return the statistics for this input stream
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public S3AInputStreamStatistics getS3AStreamStatistics() {
if (isClosed()) {
return null;
}
return inputStream.getS3AStreamStatistics();
}
/**
* Gets the internal IO statistics.
*
* @return the internal IO statistics.
*/
@Override
public IOStatistics getIOStatistics() {
if (isClosed()) {
return null;
}
return inputStream.getIOStatistics();
}
protected boolean isClosed() {
return inputStream == null;
}
protected void throwIfClosed() throws IOException {
if (isClosed()) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
// Unsupported functions.
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
throwIfClosed();
return false;
}
@Override
public boolean markSupported() {
return false;
}
}