blob: 907cafb31721d5505a71f754dfd39a297aa459ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A file system that limits the number of concurrently open input streams,
* output streams, and total streams for a target file system.
*
* <p>This file system can wrap another existing file system in cases where
* the target file system cannot handle certain connection spikes and connections
* would fail in that case. This happens, for example, for very small HDFS clusters
* with few RPC handlers, when a large Flink job tries to build up many connections during
* a checkpoint.
*
* <p>The filesystem may track the progress of streams and close streams that have been
* inactive for too long, to avoid locked streams of taking up the complete pool.
* Rather than having a dedicated reaper thread, the calls that try to open a new stream
* periodically check the currently open streams once the limit of open streams is reached.
*/
@Internal
public class LimitedConnectionsFileSystem extends FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
/** The original file system to which connections are limited. */
private final FileSystem originalFs;
/** The lock that synchronizes connection bookkeeping. */
private final ReentrantLock lock;
/** Condition for threads that are blocking on the availability of new connections. */
private final Condition available;
/** The maximum number of concurrently open output streams. */
private final int maxNumOpenOutputStreams;
/** The maximum number of concurrently open input streams. */
private final int maxNumOpenInputStreams;
/** The maximum number of concurrently open streams (input + output). */
private final int maxNumOpenStreamsTotal;
/** The nanoseconds that a opening a stream may wait for availability. */
private final long streamOpenTimeoutNanos;
/** The nanoseconds that a stream may spend not writing any bytes before it is closed as inactive. */
private final long streamInactivityTimeoutNanos;
/** The set of currently open output streams. */
@GuardedBy("lock")
private final HashSet<OutStream> openOutputStreams;
/** The set of currently open input streams. */
@GuardedBy("lock")
private final HashSet<InStream> openInputStreams;
/** The number of output streams reserved to be opened. */
@GuardedBy("lock")
private int numReservedOutputStreams;
/** The number of input streams reserved to be opened. */
@GuardedBy("lock")
private int numReservedInputStreams;
// ------------------------------------------------------------------------
/**
* Creates a new output connection limiting file system.
*
* <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
* then they are terminated as "inactive", to prevent that the limited number of connections gets
* stuck on only blocked threads.
*
* @param originalFs The original file system to which connections are limited.
* @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit).
*/
public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal) {
this(originalFs, maxNumOpenStreamsTotal, 0, 0);
}
/**
* Creates a new output connection limiting file system.
*
* <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
* then they are terminated as "inactive", to prevent that the limited number of connections gets
* stuck on only blocked threads.
*
* @param originalFs The original file system to which connections are limited.
* @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit).
* @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when
* no more connections are currently permitted.
* @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
* bytes before it is closed as inactive.
*/
public LimitedConnectionsFileSystem(
FileSystem originalFs,
int maxNumOpenStreamsTotal,
long streamOpenTimeout,
long streamInactivityTimeout) {
this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
}
/**
* Creates a new output connection limiting file system, limiting input and output streams with
* potentially different quotas.
*
* <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
* then they are terminated as "inactive", to prevent that the limited number of connections gets
* stuck on only blocked threads.
*
* @param originalFs The original file system to which connections are limited.
* @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no limit).
* @param maxNumOpenOutputStreams The maximum number of concurrent open output streams (0 means no limit).
* @param maxNumOpenInputStreams The maximum number of concurrent open input streams (0 means no limit).
* @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when
* no more connections are currently permitted.
* @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
* bytes before it is closed as inactive.
*/
public LimitedConnectionsFileSystem(
FileSystem originalFs,
int maxNumOpenStreamsTotal,
int maxNumOpenOutputStreams,
int maxNumOpenInputStreams,
long streamOpenTimeout,
long streamInactivityTimeout) {
checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >= 0");
checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >= 0");
checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >= 0");
checkArgument(streamOpenTimeout >= 0, "stream opening timeout must be >= 0 (0 means infinite timeout)");
checkArgument(streamInactivityTimeout >= 0, "stream inactivity timeout must be >= 0 (0 means infinite timeout)");
this.originalFs = checkNotNull(originalFs, "originalFs");
this.lock = new ReentrantLock(true);
this.available = lock.newCondition();
this.openOutputStreams = new HashSet<>();
this.openInputStreams = new HashSet<>();
this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
this.maxNumOpenInputStreams = maxNumOpenInputStreams;
// assign nanos overflow aware
final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;
this.streamOpenTimeoutNanos =
openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
this.streamInactivityTimeoutNanos =
inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
}
// ------------------------------------------------------------------------
/**
* Gets the maximum number of concurrently open output streams.
*/
public int getMaxNumOpenOutputStreams() {
return maxNumOpenOutputStreams;
}
/**
* Gets the maximum number of concurrently open input streams.
*/
public int getMaxNumOpenInputStreams() {
return maxNumOpenInputStreams;
}
/**
* Gets the maximum number of concurrently open streams (input + output).
*/
public int getMaxNumOpenStreamsTotal() {
return maxNumOpenStreamsTotal;
}
/**
* Gets the number of milliseconds that a opening a stream may wait for availability in the
* connection pool.
*/
public long getStreamOpenTimeout() {
return streamOpenTimeoutNanos / 1_000_000;
}
/**
* Gets the milliseconds that a stream may spend not writing any bytes before it is closed as inactive.
*/
public long getStreamInactivityTimeout() {
return streamInactivityTimeoutNanos / 1_000_000;
}
/**
* Gets the total number of open streams (input plus output).
*/
public int getTotalNumberOfOpenStreams() {
lock.lock();
try {
return numReservedOutputStreams + numReservedInputStreams;
} finally {
lock.unlock();
}
}
/**
* Gets the number of currently open output streams.
*/
public int getNumberOfOpenOutputStreams() {
lock.lock();
try {
return numReservedOutputStreams;
}
finally {
lock.unlock();
}
}
/**
* Gets the number of currently open input streams.
*/
public int getNumberOfOpenInputStreams() {
return numReservedInputStreams;
}
// ------------------------------------------------------------------------
// input & output stream opening methods
// ------------------------------------------------------------------------
@Override
public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
return createOutputStream(() -> originalFs.create(f, overwriteMode));
}
@Override
@Deprecated
@SuppressWarnings("deprecation")
public FSDataOutputStream create(
Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws IOException {
return createOutputStream(() -> originalFs.create(f, overwrite, bufferSize, replication, blockSize));
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return createInputStream(() -> originalFs.open(f, bufferSize));
}
@Override
public FSDataInputStream open(Path f) throws IOException {
return createInputStream(() -> originalFs.open(f));
}
private FSDataOutputStream createOutputStream(
final SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws IOException {
final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
() -> new OutStream(streamOpener.get(), this);
return createStream(wrappedStreamOpener, openOutputStreams, true);
}
private FSDataInputStream createInputStream(
final SupplierWithException<FSDataInputStream, IOException> streamOpener) throws IOException {
final SupplierWithException<InStream, IOException> wrappedStreamOpener =
() -> new InStream(streamOpener.get(), this);
return createStream(wrappedStreamOpener, openInputStreams, false);
}
// ------------------------------------------------------------------------
// other delegating file system methods
// ------------------------------------------------------------------------
@Override
public FileSystemKind getKind() {
return originalFs.getKind();
}
@Override
public boolean isDistributedFS() {
return originalFs.isDistributedFS();
}
@Override
public Path getWorkingDirectory() {
return originalFs.getWorkingDirectory();
}
@Override
public Path getHomeDirectory() {
return originalFs.getHomeDirectory();
}
@Override
public URI getUri() {
return originalFs.getUri();
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return originalFs.getFileStatus(f);
}
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
return originalFs.getFileBlockLocations(file, start, len);
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
return originalFs.listStatus(f);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return originalFs.delete(f, recursive);
}
@Override
public boolean mkdirs(Path f) throws IOException {
return originalFs.mkdirs(f);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return originalFs.rename(src, dst);
}
@Override
public boolean exists(Path f) throws IOException {
return originalFs.exists(f);
}
@Override
@Deprecated
@SuppressWarnings("deprecation")
public long getDefaultBlockSize() {
return originalFs.getDefaultBlockSize();
}
// ------------------------------------------------------------------------
private <T extends StreamWithTimeout> T createStream(
final SupplierWithException<T, IOException> streamOpener,
final HashSet<T> openStreams,
final boolean output) throws IOException {
final int outputLimit = output && maxNumOpenOutputStreams > 0 ? maxNumOpenOutputStreams : Integer.MAX_VALUE;
final int inputLimit = !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams : Integer.MAX_VALUE;
final int totalLimit = maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
final int outputCredit = output ? 1 : 0;
final int inputCredit = output ? 0 : 1;
// because waiting for availability may take long, we need to be interruptible here
// and handle interrupted exceptions as I/O errors
// even though the code is written to make sure the lock is held for a short time only,
// making the lock acquisition interruptible helps to guard against the cases where
// a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
try {
lock.lockInterruptibly();
try {
// some integrity checks
assert openOutputStreams.size() <= numReservedOutputStreams;
assert openInputStreams.size() <= numReservedInputStreams;
// wait until there are few enough streams so we can open another
waitForAvailability(totalLimit, outputLimit, inputLimit);
// We do not open the stream here in the locked scope because opening a stream
// could take a while. Holding the lock during that operation would block all concurrent
// attempts to try and open a stream, effectively serializing all calls to open the streams.
numReservedOutputStreams += outputCredit;
numReservedInputStreams += inputCredit;
}
finally {
lock.unlock();
}
}
catch (InterruptedException e) {
// restore interruption flag
Thread.currentThread().interrupt();
throw new IOException("interrupted before opening stream");
}
// open the stream outside the lock.
boolean success = false;
try {
final T out = streamOpener.get();
// add the stream to the set, need to re-acquire the lock
lock.lock();
try {
openStreams.add(out);
} finally {
lock.unlock();
}
// good, can now return cleanly
success = true;
return out;
}
finally {
if (!success) {
// remove the reserved credit
// we must open this non-interruptibly, because this must succeed!
lock.lock();
try {
numReservedOutputStreams -= outputCredit;
numReservedInputStreams -= inputCredit;
available.signalAll();
} finally {
lock.unlock();
}
}
}
}
@GuardedBy("lock")
private void waitForAvailability(
int totalLimit,
int outputLimit,
int inputLimit) throws InterruptedException, IOException {
checkState(lock.isHeldByCurrentThread());
// compute the deadline of this operations
final long deadline;
if (streamOpenTimeoutNanos == 0) {
deadline = Long.MAX_VALUE;
} else {
long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
// check for overflow
deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
}
// wait for available connections
long timeLeft;
if (streamInactivityTimeoutNanos == 0) {
// simple case: just wait
while ((timeLeft = (deadline - System.nanoTime())) > 0 &&
!hasAvailability(totalLimit, outputLimit, inputLimit)) {
available.await(timeLeft, TimeUnit.NANOSECONDS);
}
}
else {
// complex case: chase down inactive streams
final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;
long now;
while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0 && // while still within timeout
!hasAvailability(totalLimit, outputLimit, inputLimit)) {
// check all streams whether there in one that has been inactive for too long
if (!(closeInactiveStream(openOutputStreams, now) || closeInactiveStream(openInputStreams, now))) {
// only wait if we did not manage to close any stream.
// otherwise eagerly check again if we have availability now (we should have!)
long timeToWait = Math.min(checkIntervalNanos, timeLeft);
available.await(timeToWait, TimeUnit.NANOSECONDS);
}
}
}
// check for timeout
// we check availability again to catch cases where the timeout expired while waiting
// to re-acquire the lock
if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit)) {
throw new IOException(String.format(
"Timeout while waiting for an available stream/connection. " +
"limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d ms",
maxNumOpenStreamsTotal, maxNumOpenInputStreams, maxNumOpenOutputStreams,
numReservedInputStreams, numReservedOutputStreams, getStreamOpenTimeout()));
}
}
@GuardedBy("lock")
private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
return numReservedOutputStreams < outputLimit &&
numReservedInputStreams < inputLimit &&
numReservedOutputStreams + numReservedInputStreams < totalLimit;
}
@GuardedBy("lock")
private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams, long nowNanos) {
for (StreamWithTimeout stream : streams) {
try {
final StreamProgressTracker tracker = stream.getProgressTracker();
// If the stream is closed already, it will be removed anyways, so we
// do not classify it as inactive. We also skip the check if another check happened too recently.
if (stream.isClosed() || nowNanos < tracker.getLastCheckTimestampNanos() + streamInactivityTimeoutNanos) {
// interval since last check not yet over
return false;
}
else if (!tracker.checkNewBytesAndMark(nowNanos)) {
stream.closeDueToTimeout();
return true;
}
}
catch (StreamTimeoutException ignored) {
// may happen due to races
}
catch (IOException e) {
// only log on debug level here, to avoid log spamming
LOG.debug("Could not check for stream progress to determine inactivity", e);
}
}
return false;
}
// ------------------------------------------------------------------------
/**
* Atomically removes the given output stream from the set of currently open output streams,
* and signals that new stream can now be opened.
*/
void unregisterOutputStream(OutStream stream) {
lock.lock();
try {
// only decrement if we actually remove the stream
if (openOutputStreams.remove(stream)) {
numReservedOutputStreams--;
available.signalAll();
}
}
finally {
lock.unlock();
}
}
/**
* Atomically removes the given input stream from the set of currently open input streams,
* and signals that new stream can now be opened.
*/
void unregisterInputStream(InStream stream) {
lock.lock();
try {
// only decrement if we actually remove the stream
if (openInputStreams.remove(stream)) {
numReservedInputStreams--;
available.signalAll();
}
}
finally {
lock.unlock();
}
}
// ------------------------------------------------------------------------
/**
* A special IOException, indicating a timeout in the data output stream.
*/
public static final class StreamTimeoutException extends IOException {
private static final long serialVersionUID = -8790922066795901928L;
public StreamTimeoutException() {
super("Stream closed due to inactivity timeout. " +
"This is done to prevent inactive streams from blocking the full " +
"pool of limited connections");
}
public StreamTimeoutException(StreamTimeoutException other) {
super(other.getMessage(), other);
}
}
// ------------------------------------------------------------------------
/**
* Interface for streams that can be checked for inactivity.
*/
private interface StreamWithTimeout extends Closeable {
/**
* Gets the progress tracker for this stream.
*/
StreamProgressTracker getProgressTracker();
/**
* Gets the current position in the stream, as in number of bytes read or written.
*/
long getPos() throws IOException;
/**
* Closes the stream asynchronously with a special exception that indicates closing due
* to lack of progress.
*/
void closeDueToTimeout() throws IOException;
/**
* Checks whether the stream was closed already.
*/
boolean isClosed();
}
// ------------------------------------------------------------------------
/**
* A tracker for stream progress. This records the number of bytes read / written together
* with a timestamp when the last check happened.
*/
private static final class StreamProgressTracker {
/** The tracked stream. */
private final StreamWithTimeout stream;
/** The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
* method was called. It is important to initialize this with {@code -1} so that the
* first check (0 bytes) always appears to have made progress. */
private volatile long lastCheckBytes = -1;
/** The timestamp when the last inactivity evaluation was made. */
private volatile long lastCheckTimestampNanos;
StreamProgressTracker(StreamWithTimeout stream) {
this.stream = stream;
}
/**
* Gets the timestamp when the last inactivity evaluation was made.
*/
public long getLastCheckTimestampNanos() {
return lastCheckTimestampNanos;
}
/**
* Checks whether there were new bytes since the last time this method was invoked.
* This also sets the given timestamp, to be read via {@link #getLastCheckTimestampNanos()}.
*
* @return True, if there were new bytes, false if not.
*/
public boolean checkNewBytesAndMark(long timestamp) throws IOException {
// remember the time when checked
lastCheckTimestampNanos = timestamp;
final long bytesNow = stream.getPos();
if (bytesNow > lastCheckBytes) {
lastCheckBytes = bytesNow;
return true;
}
else {
return false;
}
}
}
// ------------------------------------------------------------------------
/**
* A data output stream that wraps a given data output stream and un-registers
* from a given connection-limiting file system
* (via {@link LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)}
* upon closing.
*/
private static final class OutStream extends FSDataOutputStream implements StreamWithTimeout {
/** The original data output stream to write to. */
private final FSDataOutputStream originalStream;
/** The connection-limiting file system to un-register from. */
private final LimitedConnectionsFileSystem fs;
/** The progress tracker for this stream. */
private final StreamProgressTracker progressTracker;
/** An exception with which the stream has been externally closed. */
private volatile StreamTimeoutException timeoutException;
/** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
private final AtomicBoolean closed = new AtomicBoolean();
OutStream(FSDataOutputStream originalStream, LimitedConnectionsFileSystem fs) {
this.originalStream = checkNotNull(originalStream);
this.fs = checkNotNull(fs);
this.progressTracker = new StreamProgressTracker(this);
}
// --- FSDataOutputStream API implementation
@Override
public void write(int b) throws IOException {
try {
originalStream.write(b);
}
catch (IOException e) {
handleIOException(e);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
try {
originalStream.write(b, off, len);
}
catch (IOException e) {
handleIOException(e);
}
}
@Override
public long getPos() throws IOException {
try {
return originalStream.getPos();
}
catch (IOException e) {
handleIOException(e);
return -1; // silence the compiler
}
}
@Override
public void flush() throws IOException {
try {
originalStream.flush();
}
catch (IOException e) {
handleIOException(e);
}
}
@Override
public void sync() throws IOException {
try {
originalStream.sync();
}
catch (IOException e) {
handleIOException(e);
}
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
originalStream.close();
}
catch (IOException e) {
handleIOException(e);
}
finally {
fs.unregisterOutputStream(this);
}
}
}
@Override
public void closeDueToTimeout() throws IOException {
this.timeoutException = new StreamTimeoutException();
close();
}
@Override
public boolean isClosed() {
return closed.get();
}
@Override
public StreamProgressTracker getProgressTracker() {
return progressTracker;
}
private void handleIOException(IOException exception) throws IOException {
if (timeoutException == null) {
throw exception;
} else {
// throw a new exception to capture this call's stack trace
// the new exception is forwarded as a suppressed exception
StreamTimeoutException te = new StreamTimeoutException(timeoutException);
te.addSuppressed(exception);
throw te;
}
}
}
/**
* A data input stream that wraps a given data input stream and un-registers
* from a given connection-limiting file system
* (via {@link LimitedConnectionsFileSystem#unregisterInputStream(InStream)}
* upon closing.
*/
private static final class InStream extends FSDataInputStream implements StreamWithTimeout {
/** The original data input stream to read from. */
private final FSDataInputStream originalStream;
/** The connection-limiting file system to un-register from. */
private final LimitedConnectionsFileSystem fs;
/** An exception with which the stream has been externally closed. */
private volatile StreamTimeoutException timeoutException;
/** The progress tracker for this stream. */
private final StreamProgressTracker progressTracker;
/** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
private final AtomicBoolean closed = new AtomicBoolean();
InStream(FSDataInputStream originalStream, LimitedConnectionsFileSystem fs) {
this.originalStream = checkNotNull(originalStream);
this.fs = checkNotNull(fs);
this.progressTracker = new StreamProgressTracker(this);
}
// --- FSDataOutputStream API implementation
@Override
public int read() throws IOException {
try {
return originalStream.read();
}
catch (IOException e) {
handleIOException(e);
return 0; // silence the compiler
}
}
@Override
public int read(byte[] b) throws IOException {
try {
return originalStream.read(b);
}
catch (IOException e) {
handleIOException(e);
return 0; // silence the compiler
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
return originalStream.read(b, off, len);
}
catch (IOException e) {
handleIOException(e);
return 0; // silence the compiler
}
}
@Override
public long skip(long n) throws IOException {
try {
return originalStream.skip(n);
}
catch (IOException e) {
handleIOException(e);
return 0L; // silence the compiler
}
}
@Override
public int available() throws IOException {
try {
return originalStream.available();
}
catch (IOException e) {
handleIOException(e);
return 0; // silence the compiler
}
}
@Override
public void mark(int readlimit) {
originalStream.mark(readlimit);
}
@Override
public void reset() throws IOException {
try {
originalStream.reset();
}
catch (IOException e) {
handleIOException(e);
}
}
@Override
public boolean markSupported() {
return originalStream.markSupported();
}
@Override
public void seek(long desired) throws IOException {
try {
originalStream.seek(desired);
}
catch (IOException e) {
handleIOException(e);
}
}
@Override
public long getPos() throws IOException {
try {
return originalStream.getPos();
}
catch (IOException e) {
handleIOException(e);
return 0; // silence the compiler
}
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
originalStream.close();
}
catch (IOException e) {
handleIOException(e);
}
finally {
fs.unregisterInputStream(this);
}
}
}
@Override
public void closeDueToTimeout() throws IOException {
this.timeoutException = new StreamTimeoutException();
close();
}
@Override
public boolean isClosed() {
return closed.get();
}
@Override
public StreamProgressTracker getProgressTracker() {
return progressTracker;
}
private void handleIOException(IOException exception) throws IOException {
if (timeoutException == null) {
throw exception;
} else {
// throw a new exception to capture this call's stack trace
// the new exception is forwarded as a suppressed exception
StreamTimeoutException te = new StreamTimeoutException(timeoutException);
te.addSuppressed(exception);
throw te;
}
}
}
// ------------------------------------------------------------------------
/**
* A simple configuration data object capturing the settings for limited connections.
*/
public static class ConnectionLimitingSettings {
/** The limit for the total number of connections, or 0, if no limit. */
public final int limitTotal;
/** The limit for the number of input stream connections, or 0, if no limit. */
public final int limitInput;
/** The limit for the number of output stream connections, or 0, if no limit. */
public final int limitOutput;
/** The stream opening timeout for a stream, in milliseconds. */
public final long streamOpenTimeout;
/** The inactivity timeout for a stream, in milliseconds. */
public final long streamInactivityTimeout;
/**
* Creates a new ConnectionLimitingSettings with the given parameters.
*
* @param limitTotal The limit for the total number of connections, or 0, if no limit.
* @param limitInput The limit for the number of input stream connections, or 0, if no limit.
* @param limitOutput The limit for the number of output stream connections, or 0, if no limit.
* @param streamOpenTimeout The maximum number of milliseconds that the file system will wait when
* no more connections are currently permitted.
* @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
* bytes before it is closed as inactive.
*/
public ConnectionLimitingSettings(
int limitTotal,
int limitInput,
int limitOutput,
long streamOpenTimeout,
long streamInactivityTimeout) {
checkArgument(limitTotal >= 0);
checkArgument(limitInput >= 0);
checkArgument(limitOutput >= 0);
checkArgument(streamOpenTimeout >= 0);
checkArgument(streamInactivityTimeout >= 0);
this.limitTotal = limitTotal;
this.limitInput = limitInput;
this.limitOutput = limitOutput;
this.streamOpenTimeout = streamOpenTimeout;
this.streamInactivityTimeout = streamInactivityTimeout;
}
// --------------------------------------------------------------------
/**
* Parses and returns the settings for connection limiting, for the file system with
* the given file system scheme.
*
* @param config The configuration to check.
* @param fsScheme The file system scheme.
*
* @return The parsed configuration, or null, if no connection limiting is configured.
*/
@Nullable
public static ConnectionLimitingSettings fromConfig(Configuration config, String fsScheme) {
checkNotNull(fsScheme, "fsScheme");
checkNotNull(config, "config");
final ConfigOption<Integer> totalLimitOption = CoreOptions.fileSystemConnectionLimit(fsScheme);
final ConfigOption<Integer> limitInOption = CoreOptions.fileSystemConnectionLimitIn(fsScheme);
final ConfigOption<Integer> limitOutOption = CoreOptions.fileSystemConnectionLimitOut(fsScheme);
final int totalLimit = config.getInteger(totalLimitOption);
final int limitIn = config.getInteger(limitInOption);
final int limitOut = config.getInteger(limitOutOption);
checkLimit(totalLimit, totalLimitOption);
checkLimit(limitIn, limitInOption);
checkLimit(limitOut, limitOutOption);
// create the settings only, if at least one limit is configured
if (totalLimit <= 0 && limitIn <= 0 && limitOut <= 0) {
// no limit configured
return null;
}
else {
final ConfigOption<Long> openTimeoutOption =
CoreOptions.fileSystemConnectionLimitTimeout(fsScheme);
final ConfigOption<Long> inactivityTimeoutOption =
CoreOptions.fileSystemConnectionLimitStreamInactivityTimeout(fsScheme);
final long openTimeout = config.getLong(openTimeoutOption);
final long inactivityTimeout = config.getLong(inactivityTimeoutOption);
checkTimeout(openTimeout, openTimeoutOption);
checkTimeout(inactivityTimeout, inactivityTimeoutOption);
return new ConnectionLimitingSettings(
totalLimit == -1 ? 0 : totalLimit,
limitIn == -1 ? 0 : limitIn,
limitOut == -1 ? 0 : limitOut,
openTimeout,
inactivityTimeout);
}
}
private static void checkLimit(int value, ConfigOption<Integer> option) {
if (value < -1) {
throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + value);
}
}
private static void checkTimeout(long timeout, ConfigOption<Long> option) {
if (timeout < 0) {
throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + timeout);
}
}
}
}