| /* |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.commons.io.input; |
| |
| import static org.apache.commons.io.IOUtils.EOF; |
| |
| // import javax.annotation.concurrent.GuardedBy; |
| import java.io.EOFException; |
| import java.io.FilterInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InterruptedIOException; |
| import java.nio.ByteBuffer; |
| import java.util.Objects; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.commons.io.build.AbstractStreamBuilder; |
| |
| /** |
| * Implements {@link InputStream} to asynchronously read ahead from an underlying input stream when a specified amount of data has been read from the current |
| * buffer. It does so by maintaining two buffers: an active buffer and a read ahead buffer. The active buffer contains data which should be returned when a |
| * read() call is issued. The read ahead buffer is used to asynchronously read from the underlying input stream. When the current active buffer is exhausted, we |
| * flip the two buffers so that we can start reading from the read ahead buffer without being blocked by disk I/O. |
| * <p> |
| * To build an instance, use {@link Builder}. |
| * </p> |
| * <p> |
| * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19. |
| * </p> |
| * |
| * @see Builder |
| * @since 2.9.0 |
| */ |
| public class ReadAheadInputStream extends FilterInputStream { |
| |
| // @formatter:off |
| /** |
| * Builds a new {@link ReadAheadInputStream}. |
| * |
| * <p> |
| * For example: |
| * </p> |
| * <pre>{@code |
| * ReadAheadInputStream s = ReadAheadInputStream.builder() |
| * .setPath(path) |
| * .setExecutorService(Executors.newSingleThreadExecutor(ReadAheadInputStream::newThread)) |
| * .get();} |
| * </pre> |
| * |
| * @see #get() |
| * @since 2.12.0 |
| */ |
| // @formatter:on |
| public static class Builder extends AbstractStreamBuilder<ReadAheadInputStream, Builder> { |
| |
| private ExecutorService executorService; |
| |
| /** |
| * Builds a new {@link ReadAheadInputStream}. |
| * <p> |
| * You must set input that supports {@link #getInputStream()}, otherwise, this method throws an exception. |
| * </p> |
| * <p> |
| * This builder use the following aspects: |
| * </p> |
| * <ul> |
| * <li>{@link #getInputStream()}</li> |
| * <li>{@link #getBufferSize()}</li> |
| * <li>{@link ExecutorService}</li> |
| * </ul> |
| * |
| * @return a new instance. |
| * @throws IllegalStateException if the {@code origin} is {@code null}. |
| * @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}. |
| * @throws IOException if an I/O error occurs. |
| * @see #getInputStream() |
| * @see #getBufferSize() |
| */ |
| @SuppressWarnings("resource") |
| @Override |
| public ReadAheadInputStream get() throws IOException { |
| return new ReadAheadInputStream(getInputStream(), getBufferSize(), executorService != null ? executorService : newExecutorService(), |
| executorService == null); |
| } |
| |
| /** |
| * Sets the executor service for the read-ahead thread. |
| * |
| * @param executorService the executor service for the read-ahead thread. |
| * @return this |
| */ |
| public Builder setExecutorService(final ExecutorService executorService) { |
| this.executorService = executorService; |
| return this; |
| } |
| |
| } |
| |
| private static final ThreadLocal<byte[]> BYTE_ARRAY_1 = ThreadLocal.withInitial(() -> new byte[1]); |
| |
| /** |
| * Constructs a new {@link Builder}. |
| * |
| * @return a new {@link Builder}. |
| * @since 2.12.0 |
| */ |
| public static Builder builder() { |
| return new Builder(); |
| } |
| |
| /** |
| * Constructs a new daemon thread. |
| * |
| * @param r the thread's runnable. |
| * @return a new daemon thread. |
| */ |
| private static Thread newDaemonThread(final Runnable r) { |
| final Thread thread = new Thread(r, "commons-io-read-ahead"); |
| thread.setDaemon(true); |
| return thread; |
| } |
| |
| /** |
| * Constructs a new daemon executor service. |
| * |
| * @return a new daemon executor service. |
| */ |
| private static ExecutorService newExecutorService() { |
| return Executors.newSingleThreadExecutor(ReadAheadInputStream::newDaemonThread); |
| } |
| |
| private final ReentrantLock stateChangeLock = new ReentrantLock(); |
| |
| // @GuardedBy("stateChangeLock") |
| private ByteBuffer activeBuffer; |
| |
| // @GuardedBy("stateChangeLock") |
| private ByteBuffer readAheadBuffer; |
| |
| // @GuardedBy("stateChangeLock") |
| private boolean endOfStream; |
| |
| // @GuardedBy("stateChangeLock") |
| // true if async read is in progress |
| private boolean readInProgress; |
| |
| // @GuardedBy("stateChangeLock") |
| // true if read is aborted due to an exception in reading from underlying input stream. |
| private boolean readAborted; |
| |
| // @GuardedBy("stateChangeLock") |
| private Throwable readException; |
| |
| // @GuardedBy("stateChangeLock") |
| // whether the close method is called. |
| private boolean isClosed; |
| |
| // @GuardedBy("stateChangeLock") |
| // true when the close method will close the underlying input stream. This is valid only if |
| // `isClosed` is true. |
| private boolean isUnderlyingInputStreamBeingClosed; |
| |
| // @GuardedBy("stateChangeLock") |
| // whether there is a read ahead task running, |
| private boolean isReading; |
| |
| // Whether there is a reader waiting for data. |
| private final AtomicBoolean isWaiting = new AtomicBoolean(); |
| |
| private final ExecutorService executorService; |
| |
| private final boolean shutdownExecutorService; |
| |
| private final Condition asyncReadComplete = stateChangeLock.newCondition(); |
| |
| /** |
| * Constructs an instance with the specified buffer size and read-ahead threshold |
| * |
| * @param inputStream The underlying input stream. |
| * @param bufferSizeInBytes The buffer size. |
| * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} |
| */ |
| @Deprecated |
| public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes) { |
| this(inputStream, bufferSizeInBytes, newExecutorService(), true); |
| } |
| |
| /** |
| * Constructs an instance with the specified buffer size and read-ahead threshold |
| * |
| * @param inputStream The underlying input stream. |
| * @param bufferSizeInBytes The buffer size. |
| * @param executorService An executor service for the read-ahead thread. |
| * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} |
| */ |
| @Deprecated |
| public ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService) { |
| this(inputStream, bufferSizeInBytes, executorService, false); |
| } |
| |
| /** |
| * Constructs an instance with the specified buffer size and read-ahead threshold |
| * |
| * @param inputStream The underlying input stream. |
| * @param bufferSizeInBytes The buffer size. |
| * @param executorService An executor service for the read-ahead thread. |
| * @param shutdownExecutorService Whether or not to shut down the given ExecutorService on close. |
| */ |
| private ReadAheadInputStream(final InputStream inputStream, final int bufferSizeInBytes, final ExecutorService executorService, |
| final boolean shutdownExecutorService) { |
| super(Objects.requireNonNull(inputStream, "inputStream")); |
| if (bufferSizeInBytes <= 0) { |
| throw new IllegalArgumentException("bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); |
| } |
| this.executorService = Objects.requireNonNull(executorService, "executorService"); |
| this.shutdownExecutorService = shutdownExecutorService; |
| this.activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); |
| this.readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); |
| this.activeBuffer.flip(); |
| this.readAheadBuffer.flip(); |
| } |
| |
| @Override |
| public int available() throws IOException { |
| stateChangeLock.lock(); |
| // Make sure we have no integer overflow. |
| try { |
| return (int) Math.min(Integer.MAX_VALUE, (long) activeBuffer.remaining() + readAheadBuffer.remaining()); |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| } |
| |
| private void checkReadException() throws IOException { |
| if (readAborted) { |
| if (readException instanceof IOException) { |
| throw (IOException) readException; |
| } |
| throw new IOException(readException); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| boolean isSafeToCloseUnderlyingInputStream = false; |
| stateChangeLock.lock(); |
| try { |
| if (isClosed) { |
| return; |
| } |
| isClosed = true; |
| if (!isReading) { |
| // Nobody is reading, so we can close the underlying input stream in this method. |
| isSafeToCloseUnderlyingInputStream = true; |
| // Flip this to make sure the read ahead task will not close the underlying input stream. |
| isUnderlyingInputStreamBeingClosed = true; |
| } |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| |
| if (shutdownExecutorService) { |
| try { |
| executorService.shutdownNow(); |
| executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); |
| } catch (final InterruptedException e) { |
| final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); |
| iio.initCause(e); |
| throw iio; |
| } finally { |
| if (isSafeToCloseUnderlyingInputStream) { |
| super.close(); |
| } |
| } |
| } |
| } |
| |
| private void closeUnderlyingInputStreamIfNecessary() { |
| boolean needToCloseUnderlyingInputStream = false; |
| stateChangeLock.lock(); |
| try { |
| isReading = false; |
| if (isClosed && !isUnderlyingInputStreamBeingClosed) { |
| // close method cannot close underlyingInputStream because we were reading. |
| needToCloseUnderlyingInputStream = true; |
| } |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| if (needToCloseUnderlyingInputStream) { |
| try { |
| super.close(); |
| } catch (final IOException ignored) { |
| // TODO Rethrow as UncheckedIOException? |
| } |
| } |
| } |
| |
| private boolean isEndOfStream() { |
| return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream; |
| } |
| |
| @Override |
| public int read() throws IOException { |
| if (activeBuffer.hasRemaining()) { |
| // short path - just get one byte. |
| return activeBuffer.get() & 0xFF; |
| } |
| final byte[] oneByteArray = BYTE_ARRAY_1.get(); |
| oneByteArray[0] = 0; |
| return read(oneByteArray, 0, 1) == EOF ? EOF : oneByteArray[0] & 0xFF; |
| } |
| |
| @Override |
| public int read(final byte[] b, final int offset, int len) throws IOException { |
| if (offset < 0 || len < 0 || len > b.length - offset) { |
| throw new IndexOutOfBoundsException(); |
| } |
| if (len == 0) { |
| return 0; |
| } |
| |
| if (!activeBuffer.hasRemaining()) { |
| // No remaining in active buffer - lock and switch to write ahead buffer. |
| stateChangeLock.lock(); |
| try { |
| waitForAsyncReadComplete(); |
| if (!readAheadBuffer.hasRemaining()) { |
| // The first read. |
| readAsync(); |
| waitForAsyncReadComplete(); |
| if (isEndOfStream()) { |
| return EOF; |
| } |
| } |
| // Swap the newly read ahead buffer in place of empty active buffer. |
| swapBuffers(); |
| // After swapping buffers, trigger another async read for read ahead buffer. |
| readAsync(); |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| } |
| len = Math.min(len, activeBuffer.remaining()); |
| activeBuffer.get(b, offset, len); |
| |
| return len; |
| } |
| |
| /** |
| * Read data from underlyingInputStream to readAheadBuffer asynchronously. |
| * |
| * @throws IOException if an I/O error occurs. |
| */ |
| private void readAsync() throws IOException { |
| stateChangeLock.lock(); |
| final byte[] arr; |
| try { |
| arr = readAheadBuffer.array(); |
| if (endOfStream || readInProgress) { |
| return; |
| } |
| checkReadException(); |
| readAheadBuffer.position(0); |
| readAheadBuffer.flip(); |
| readInProgress = true; |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| executorService.execute(() -> { |
| stateChangeLock.lock(); |
| try { |
| if (isClosed) { |
| readInProgress = false; |
| return; |
| } |
| // Flip this so that the close method will not close the underlying input stream when we |
| // are reading. |
| isReading = true; |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| |
| // Please note that it is safe to release the lock and read into the read ahead buffer |
| // because either of following two conditions will hold: |
| // |
| // 1. The active buffer has data available to read so the reader will not read from the read ahead buffer. |
| // |
| // 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits |
| // for this async read to complete. |
| // |
| // So there is no race condition in both the situations. |
| int read = 0; |
| int off = 0, len = arr.length; |
| Throwable exception = null; |
| try { |
| // try to fill the read ahead buffer. |
| // if a reader is waiting, possibly return early. |
| do { |
| read = in.read(arr, off, len); |
| if (read <= 0) { |
| break; |
| } |
| off += read; |
| len -= read; |
| } while (len > 0 && !isWaiting.get()); |
| } catch (final Throwable ex) { |
| exception = ex; |
| if (ex instanceof Error) { |
| // `readException` may not be reported to the user. Rethrow Error to make sure at least |
| // The user can see Error in UncaughtExceptionHandler. |
| throw (Error) ex; |
| } |
| } finally { |
| stateChangeLock.lock(); |
| try { |
| readAheadBuffer.limit(off); |
| if (read < 0 || exception instanceof EOFException) { |
| endOfStream = true; |
| } else if (exception != null) { |
| readAborted = true; |
| readException = exception; |
| } |
| readInProgress = false; |
| signalAsyncReadComplete(); |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| closeUnderlyingInputStreamIfNecessary(); |
| } |
| }); |
| } |
| |
| private void signalAsyncReadComplete() { |
| stateChangeLock.lock(); |
| try { |
| asyncReadComplete.signalAll(); |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long skip(final long n) throws IOException { |
| if (n <= 0L) { |
| return 0L; |
| } |
| if (n <= activeBuffer.remaining()) { |
| // Only skipping from active buffer is sufficient |
| activeBuffer.position((int) n + activeBuffer.position()); |
| return n; |
| } |
| stateChangeLock.lock(); |
| final long skipped; |
| try { |
| skipped = skipInternal(n); |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| return skipped; |
| } |
| |
| /** |
| * Internal skip function which should be called only from skip(). The assumption is that the stateChangeLock is already acquired in the caller before |
| * calling this function. |
| * |
| * @param n the number of bytes to be skipped. |
| * @return the actual number of bytes skipped. |
| * @throws IOException if an I/O error occurs. |
| */ |
| private long skipInternal(final long n) throws IOException { |
| assert stateChangeLock.isLocked(); |
| waitForAsyncReadComplete(); |
| if (isEndOfStream()) { |
| return 0; |
| } |
| if (available() >= n) { |
| // we can skip from the internal buffers |
| int toSkip = (int) n; |
| // We need to skip from both active buffer and read ahead buffer |
| toSkip -= activeBuffer.remaining(); |
| assert toSkip > 0; // skipping from activeBuffer already handled. |
| activeBuffer.position(0); |
| activeBuffer.flip(); |
| readAheadBuffer.position(toSkip + readAheadBuffer.position()); |
| swapBuffers(); |
| // Trigger async read to emptied read ahead buffer. |
| readAsync(); |
| return n; |
| } |
| final int skippedBytes = available(); |
| final long toSkip = n - skippedBytes; |
| activeBuffer.position(0); |
| activeBuffer.flip(); |
| readAheadBuffer.position(0); |
| readAheadBuffer.flip(); |
| final long skippedFromInputStream = in.skip(toSkip); |
| readAsync(); |
| return skippedBytes + skippedFromInputStream; |
| } |
| |
| /** |
| * Flips the active and read ahead buffer |
| */ |
| private void swapBuffers() { |
| final ByteBuffer temp = activeBuffer; |
| activeBuffer = readAheadBuffer; |
| readAheadBuffer = temp; |
| } |
| |
| private void waitForAsyncReadComplete() throws IOException { |
| stateChangeLock.lock(); |
| try { |
| isWaiting.set(true); |
| // There is only one reader, and one writer, so the writer should signal only once, |
| // but a while loop checking the wake-up condition is still needed to avoid spurious wakeups. |
| while (readInProgress) { |
| asyncReadComplete.await(); |
| } |
| } catch (final InterruptedException e) { |
| final InterruptedIOException iio = new InterruptedIOException(e.getMessage()); |
| iio.initCause(e); |
| throw iio; |
| } finally { |
| try { |
| isWaiting.set(false); |
| } finally { |
| stateChangeLock.unlock(); |
| } |
| } |
| checkReadException(); |
| } |
| } |