blob: a8af3a401b8b374a3bec9f4750f2522df7495828 [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.http.io;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hc.core5.util.Args;
/**
* A stream wrapper that triggers actions on {@link #close close()} and EOF.
* Primarily used to auto-release an underlying managed connection when the response
* body is consumed or no longer needed.
*
* @see EofSensorWatcher
*
* @since 4.0
*/
// don't use FilterInputStream as the base class, we'd have to
// override markSupported(), mark(), and reset() to disable them
public class EofSensorInputStream extends InputStream {
/**
* The wrapped input stream, while accessible.
* The value changes to {@code null} when the wrapped stream
* becomes inaccessible.
*/
private InputStream wrappedStream;
/**
* Indicates whether this stream itself is closed.
* If it isn't, but {@link #wrappedStream wrappedStream}
* is {@code null}, we're running in EOF mode.
* All read operations will indicate EOF without accessing
* the underlying stream. After closing this stream, read
* operations will trigger an {@link IOException IOException}.
*
* @see #isReadAllowed isReadAllowed
*/
private boolean selfClosed;
/** The watcher to be notified, if any. */
private final EofSensorWatcher eofWatcher;
/**
* Creates a new EOF sensor.
* If no watcher is passed, the underlying stream will simply be
* closed when EOF is detected or {@link #close close} is called.
* Otherwise, the watcher decides whether the underlying stream
* should be closed before detaching from it.
*
* @param in the wrapped stream
* @param watcher the watcher for events, or {@code null} for
* auto-close behavior without notification
*/
public EofSensorInputStream(final InputStream in,
final EofSensorWatcher watcher) {
Args.notNull(in, "Wrapped stream");
wrappedStream = in;
selfClosed = false;
eofWatcher = watcher;
}
boolean isSelfClosed() {
return selfClosed;
}
InputStream getWrappedStream() {
return wrappedStream;
}
/**
* Checks whether the underlying stream can be read from.
*
* @return {@code true} if the underlying stream is accessible,
* {@code false} if this stream is in EOF mode and
* detached from the underlying stream
*
* @throws IOException if this stream is already closed
*/
private boolean isReadAllowed() throws IOException {
if (selfClosed) {
throw new IOException("Attempted read on closed stream.");
}
return wrappedStream != null;
}
@Override
public int read() throws IOException {
int b = -1;
if (isReadAllowed()) {
try {
b = wrappedStream.read();
checkEOF(b);
} catch (final IOException ex) {
checkAbort();
throw ex;
}
}
return b;
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
int readLen = -1;
if (isReadAllowed()) {
try {
readLen = wrappedStream.read(b, off, len);
checkEOF(readLen);
} catch (final IOException ex) {
checkAbort();
throw ex;
}
}
return readLen;
}
@Override
public int read(final byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int available() throws IOException {
int a = 0; // not -1
if (isReadAllowed()) {
try {
a = wrappedStream.available();
// no checkEOF() here, available() can't trigger EOF
} catch (final IOException ex) {
checkAbort();
throw ex;
}
}
return a;
}
@Override
public void close() throws IOException {
// tolerate multiple calls to close()
selfClosed = true;
checkClose();
}
/**
* Detects EOF and notifies the watcher.
* This method should only be called while the underlying stream is
* still accessible. Use {@link #isReadAllowed isReadAllowed} to
* check that condition.
* <p>
* If EOF is detected, the watcher will be notified and this stream
* is detached from the underlying stream. This prevents multiple
* notifications from this stream.
* </p>
*
* @param eof the result of the calling read operation.
* A negative value indicates that EOF is reached.
*
* @throws IOException
* in case of an IO problem on closing the underlying stream
*/
private void checkEOF(final int eof) throws IOException {
final InputStream toCheckStream = wrappedStream;
if ((toCheckStream != null) && (eof < 0)) {
try {
boolean scws = true; // should close wrapped stream?
if (eofWatcher != null) {
scws = eofWatcher.eofDetected(toCheckStream);
}
if (scws) {
toCheckStream.close();
}
} finally {
wrappedStream = null;
}
}
}
/**
* Detects stream close and notifies the watcher.
* There's not much to detect since this is called by {@link #close close}.
* The watcher will only be notified if this stream is closed
* for the first time and before EOF has been detected.
* This stream will be detached from the underlying stream to prevent
* multiple notifications to the watcher.
*
* @throws IOException
* in case of an IO problem on closing the underlying stream
*/
private void checkClose() throws IOException {
final InputStream toCloseStream = wrappedStream;
if (toCloseStream != null) {
try {
boolean scws = true; // should close wrapped stream?
if (eofWatcher != null) {
scws = eofWatcher.streamClosed(toCloseStream);
}
if (scws) {
toCloseStream.close();
}
} finally {
wrappedStream = null;
}
}
}
/**
* Detects stream abort and notifies the watcher.
* There's not much to detect since this is called by {@link #abort()}.
* The watcher will only be notified if this stream is aborted
* for the first time and before EOF has been detected or the
* stream has been {@link #close closed} gracefully.
* This stream will be detached from the underlying stream to prevent
* multiple notifications to the watcher.
*
* @throws IOException
* in case of an IO problem on closing the underlying stream
*/
private void checkAbort() throws IOException {
final InputStream toAbortStream = wrappedStream;
if (toAbortStream != null) {
try {
boolean scws = true; // should close wrapped stream?
if (eofWatcher != null) {
scws = eofWatcher.streamAbort(toAbortStream);
}
if (scws) {
toAbortStream.close();
}
} finally {
wrappedStream = null;
}
}
}
/**
* Aborts this stream.
* This is a special version of {@link #close close()} which prevents
* re-use of the underlying connection, if any. Calling this method
* indicates that there should be no attempt to read until the end of
* the stream.
*
* @throws IOException
* in case of an IO problem on closing the underlying stream
*/
public void abort() throws IOException {
// tolerate multiple calls
selfClosed = true;
checkAbort();
}
}