| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a.prefetch; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.ByteBuffer; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.fs.CanSetReadahead; |
| import org.apache.hadoop.fs.FSExceptionMessages; |
| import org.apache.hadoop.fs.StreamCapabilities; |
| import org.apache.hadoop.fs.impl.prefetch.BlockData; |
| import org.apache.hadoop.fs.impl.prefetch.FilePosition; |
| import org.apache.hadoop.fs.impl.prefetch.Validate; |
| import org.apache.hadoop.fs.s3a.S3AInputPolicy; |
| import org.apache.hadoop.fs.s3a.S3AInputStream; |
| import org.apache.hadoop.fs.s3a.S3AReadOpContext; |
| import org.apache.hadoop.fs.s3a.S3ObjectAttributes; |
| import org.apache.hadoop.fs.s3a.impl.ChangeTracker; |
| import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; |
| import org.apache.hadoop.fs.statistics.IOStatistics; |
| import org.apache.hadoop.fs.statistics.IOStatisticsSource; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| /** |
| * Provides an {@link InputStream} that allows reading from an S3 file. |
| */ |
| public abstract class S3ARemoteInputStream |
| extends InputStream |
| implements CanSetReadahead, StreamCapabilities, IOStatisticsSource { |
| |
| private static final Logger LOG = LoggerFactory.getLogger( |
| S3ARemoteInputStream.class); |
| |
| /** |
| * The S3 file read by this instance. |
| */ |
| private S3ARemoteObject remoteObject; |
| |
| /** |
| * Reading of S3 file takes place through this reader. |
| */ |
| private S3ARemoteObjectReader reader; |
| |
| /** |
| * Name of this stream. Used only for logging. |
| */ |
| private final String name; |
| |
| /** |
| * Indicates whether the stream has been closed. |
| */ |
| private volatile boolean closed; |
| |
| /** |
| * Current position within the file. |
| */ |
| private FilePosition fpos; |
| |
| /** The target of the most recent seek operation. */ |
| private long seekTargetPos; |
| |
| /** Information about each block of the mapped S3 file. */ |
| private BlockData blockData; |
| |
| /** Read-specific operation context. */ |
| private S3AReadOpContext context; |
| |
| /** Attributes of the S3 object being read. */ |
| private S3ObjectAttributes s3Attributes; |
| |
| /** Callbacks used for interacting with the underlying S3 client. */ |
| private S3AInputStream.InputStreamCallbacks client; |
| |
| /** Used for reporting input stream access statistics. */ |
| private final S3AInputStreamStatistics streamStatistics; |
| |
| private S3AInputPolicy inputPolicy; |
| |
| private final ChangeTracker changeTracker; |
| |
| private final IOStatistics ioStatistics; |
| |
| /** |
| * Initializes a new instance of the {@code S3ARemoteInputStream} class. |
| * |
| * @param context read-specific operation context. |
| * @param s3Attributes attributes of the S3 object being read. |
| * @param client callbacks used for interacting with the underlying S3 client. |
| * @param streamStatistics statistics for this stream. |
| * |
| * @throws IllegalArgumentException if context is null. |
| * @throws IllegalArgumentException if s3Attributes is null. |
| * @throws IllegalArgumentException if client is null. |
| */ |
| public S3ARemoteInputStream( |
| S3AReadOpContext context, |
| S3ObjectAttributes s3Attributes, |
| S3AInputStream.InputStreamCallbacks client, |
| S3AInputStreamStatistics streamStatistics) { |
| |
| this.context = requireNonNull(context); |
| this.s3Attributes = requireNonNull(s3Attributes); |
| this.client = requireNonNull(client); |
| this.streamStatistics = requireNonNull(streamStatistics); |
| this.ioStatistics = streamStatistics.getIOStatistics(); |
| this.name = S3ARemoteObject.getPath(s3Attributes); |
| this.changeTracker = new ChangeTracker( |
| this.name, |
| context.getChangeDetectionPolicy(), |
| this.streamStatistics.getChangeTrackerStatistics(), |
| s3Attributes); |
| |
| setInputPolicy(context.getInputPolicy()); |
| setReadahead(context.getReadahead()); |
| |
| long fileSize = s3Attributes.getLen(); |
| int bufferSize = context.getPrefetchBlockSize(); |
| |
| this.blockData = new BlockData(fileSize, bufferSize); |
| this.fpos = new FilePosition(fileSize, bufferSize); |
| this.remoteObject = getS3File(); |
| this.reader = new S3ARemoteObjectReader(remoteObject); |
| |
| this.seekTargetPos = 0; |
| } |
| |
| /** |
| * Gets the internal IO statistics. |
| * |
| * @return the internal IO statistics. |
| */ |
| @Override |
| public IOStatistics getIOStatistics() { |
| return ioStatistics; |
| } |
| |
| /** |
| * Access the input stream statistics. |
| * This is for internal testing and may be removed without warning. |
| * @return the statistics for this input stream |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public S3AInputStreamStatistics getS3AStreamStatistics() { |
| return streamStatistics; |
| } |
| |
| /** |
| * Sets the number of bytes to read ahead each time. |
| * |
| * @param readahead the number of bytes to read ahead each time.. |
| */ |
| @Override |
| public synchronized void setReadahead(Long readahead) { |
| // We support read head by prefetching therefore we ignore the supplied value. |
| if (readahead != null) { |
| Validate.checkNotNegative(readahead, "readahead"); |
| } |
| } |
| |
| /** |
| * Indicates whether the given {@code capability} is supported by this stream. |
| * |
| * @param capability the capability to check. |
| * @return true if the given {@code capability} is supported by this stream, false otherwise. |
| */ |
| @Override |
| public boolean hasCapability(String capability) { |
| return capability.equalsIgnoreCase(StreamCapabilities.IOSTATISTICS) |
| || capability.equalsIgnoreCase(StreamCapabilities.READAHEAD); |
| } |
| |
| /** |
| * Set/update the input policy of the stream. |
| * This updates the stream statistics. |
| * @param inputPolicy new input policy. |
| */ |
| private void setInputPolicy(S3AInputPolicy inputPolicy) { |
| this.inputPolicy = inputPolicy; |
| streamStatistics.inputPolicySet(inputPolicy.ordinal()); |
| } |
| |
| /** |
| * Returns the number of bytes that can read from this stream without blocking. |
| */ |
| @Override |
| public int available() throws IOException { |
| throwIfClosed(); |
| |
| if (!ensureCurrentBuffer()) { |
| return 0; |
| } |
| |
| return fpos.buffer().remaining(); |
| } |
| |
| /** |
| * Gets the current position. |
| * |
| * @return the current position. |
| * @throws IOException if there is an IO error during this operation. |
| */ |
| public long getPos() throws IOException { |
| throwIfClosed(); |
| |
| if (fpos.isValid()) { |
| return fpos.absolute(); |
| } else { |
| return seekTargetPos; |
| } |
| } |
| |
| /** |
| * Moves the current read position so that the next read will occur at {@code pos}. |
| * |
| * @param pos the absolute position to seek to. |
| * @throws IOException if there an error during this operation. |
| * |
| * @throws IllegalArgumentException if pos is outside of the range [0, file size]. |
| */ |
| public void seek(long pos) throws IOException { |
| throwIfClosed(); |
| throwIfInvalidSeek(pos); |
| |
| if (!fpos.setAbsolute(pos)) { |
| fpos.invalidate(); |
| seekTargetPos = pos; |
| } |
| } |
| |
| /** |
| * Ensures that a non-empty valid buffer is available for immediate reading. |
| * It returns true when at least one such buffer is available for reading. |
| * It returns false on reaching the end of the stream. |
| * |
| * @return true if at least one such buffer is available for reading, false otherwise. |
| * @throws IOException if there is an IO error during this operation. |
| */ |
| protected abstract boolean ensureCurrentBuffer() throws IOException; |
| |
| @Override |
| public int read() throws IOException { |
| throwIfClosed(); |
| |
| if (remoteObject.size() == 0 |
| || seekTargetPos >= remoteObject.size()) { |
| return -1; |
| } |
| |
| if (!ensureCurrentBuffer()) { |
| return -1; |
| } |
| |
| incrementBytesRead(1); |
| |
| return fpos.buffer().get() & 0xff; |
| } |
| |
| /** |
| * Reads bytes from this stream and copies them into |
| * the given {@code buffer} starting at the beginning (offset 0). |
| * Returns the number of bytes actually copied in to the given buffer. |
| * |
| * @param buffer the buffer to copy data into. |
| * @return the number of bytes actually copied in to the given buffer. |
| * @throws IOException if there is an IO error during this operation. |
| */ |
| @Override |
| public int read(byte[] buffer) throws IOException { |
| return read(buffer, 0, buffer.length); |
| } |
| |
| /** |
| * Reads up to {@code len} bytes from this stream and copies them into |
| * the given {@code buffer} starting at the given {@code offset}. |
| * Returns the number of bytes actually copied in to the given buffer. |
| * |
| * @param buffer the buffer to copy data into. |
| * @param offset data is copied starting at this offset. |
| * @param len max number of bytes to copy. |
| * @return the number of bytes actually copied in to the given buffer. |
| * @throws IOException if there is an IO error during this operation. |
| */ |
| @Override |
| public int read(byte[] buffer, int offset, int len) throws IOException { |
| throwIfClosed(); |
| |
| if (len == 0) { |
| return 0; |
| } |
| |
| if (remoteObject.size() == 0 |
| || seekTargetPos >= remoteObject.size()) { |
| return -1; |
| } |
| |
| if (!ensureCurrentBuffer()) { |
| return -1; |
| } |
| |
| int numBytesRead = 0; |
| int numBytesRemaining = len; |
| |
| while (numBytesRemaining > 0) { |
| if (!ensureCurrentBuffer()) { |
| break; |
| } |
| |
| ByteBuffer buf = fpos.buffer(); |
| int bytesToRead = Math.min(numBytesRemaining, buf.remaining()); |
| buf.get(buffer, offset, bytesToRead); |
| incrementBytesRead(bytesToRead); |
| offset += bytesToRead; |
| numBytesRemaining -= bytesToRead; |
| numBytesRead += bytesToRead; |
| } |
| |
| return numBytesRead; |
| } |
| |
| protected S3ARemoteObject getFile() { |
| return remoteObject; |
| } |
| |
| protected S3ARemoteObjectReader getReader() { |
| return reader; |
| } |
| |
| protected S3ObjectAttributes getS3ObjectAttributes() { |
| return s3Attributes; |
| } |
| |
| protected FilePosition getFilePosition() { |
| return fpos; |
| } |
| |
| protected String getName() { |
| return name; |
| } |
| |
| protected boolean isClosed() { |
| return closed; |
| } |
| |
| protected long getSeekTargetPos() { |
| return seekTargetPos; |
| } |
| |
| protected void setSeekTargetPos(long pos) { |
| seekTargetPos = pos; |
| } |
| |
| protected BlockData getBlockData() { |
| return blockData; |
| } |
| |
| protected S3AReadOpContext getContext() { |
| return context; |
| } |
| |
| private void incrementBytesRead(int bytesRead) { |
| if (bytesRead > 0) { |
| streamStatistics.bytesRead(bytesRead); |
| if (getContext().getStats() != null) { |
| getContext().getStats().incrementBytesRead(bytesRead); |
| } |
| fpos.incrementBytesRead(bytesRead); |
| } |
| } |
| |
| protected S3ARemoteObject getS3File() { |
| return new S3ARemoteObject( |
| context, |
| s3Attributes, |
| client, |
| streamStatistics, |
| changeTracker |
| ); |
| } |
| |
| protected String getOffsetStr(long offset) { |
| int blockNumber = -1; |
| |
| if (blockData.isValidOffset(offset)) { |
| blockNumber = blockData.getBlockNumber(offset); |
| } |
| |
| return String.format("%d:%d", blockNumber, offset); |
| } |
| |
| /** |
| * Closes this stream and releases all acquired resources. |
| * |
| * @throws IOException if there is an IO error during this operation. |
| */ |
| @Override |
| public void close() throws IOException { |
| if (closed) { |
| return; |
| } |
| closed = true; |
| |
| blockData = null; |
| reader.close(); |
| reader = null; |
| remoteObject = null; |
| fpos.invalidate(); |
| try { |
| client.close(); |
| } finally { |
| streamStatistics.close(); |
| } |
| client = null; |
| } |
| |
| @Override |
| public boolean markSupported() { |
| return false; |
| } |
| |
| protected void throwIfClosed() throws IOException { |
| if (closed) { |
| throw new IOException( |
| name + ": " + FSExceptionMessages.STREAM_IS_CLOSED); |
| } |
| } |
| |
| protected void throwIfInvalidSeek(long pos) throws EOFException { |
| if (pos < 0) { |
| throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); |
| } |
| } |
| |
| // Unsupported functions. |
| |
| @Override |
| public void mark(int readlimit) { |
| throw new UnsupportedOperationException("mark not supported"); |
| } |
| |
| @Override |
| public void reset() { |
| throw new UnsupportedOperationException("reset not supported"); |
| } |
| |
| @Override |
| public long skip(long n) { |
| throw new UnsupportedOperationException("skip not supported"); |
| } |
| } |