[FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0.
This will cause deadlock as read buffer was occupied by a reader with low consumption priority.
This closes #22122
(cherry picked from commit 5ad2ae2c24ade2655981f609298978d26329466f)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 0f38aca..72ef41d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -221,14 +221,22 @@
} while (System.nanoTime() < timeoutTime
|| System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime()));
- if (numRequestedBuffers <= 0) {
- throw new TimeoutException(
- String.format(
- "Buffer request timeout, this means there is a fierce contention of"
- + " the batch shuffle read memory, please increase '%s'.",
- TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
- }
- return new ArrayDeque<>();
+ // This is a safe net against potential deadlocks.
+ //
+ // A deadlock can happen when the downstream task needs to consume multiple result
+ // partitions (e.g., A and B) in specific order (cannot consume B before finishing
+ // consuming A). Since the reading buffer pool is shared across the TM, if B happens to
+ // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
+ // B from being consumed and releasing the buffers.
+ //
+ // The imperfect solution here is to fail all the subpartitionReaders (A), which
+ // consequently fail all the downstream tasks, unregister their other
+ // subpartitionReaders (B) and release the read buffers.
+ throw new TimeoutException(
+ String.format(
+ "Buffer request timeout, this means there is a fierce contention of"
+ + " the batch shuffle read memory, please increase '%s'.",
+ TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
}
private long getBufferRequestTimeoutTime() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 14ac2c8..5e595f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -260,25 +260,22 @@
} while (System.nanoTime() < timeoutTime
|| System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime()));
- if (numRequestedBuffers <= 0) {
- // This is a safe net against potential deadlocks.
- //
- // A deadlock can happen when the downstream task needs to consume multiple result
- // partitions (e.g., A and B) in specific order (cannot consume B before finishing
- // consuming A). Since the reading buffer pool is shared across the TM, if B happens to
- // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
- // B from being consumed and releasing the buffers.
- //
- // The imperfect solution here is to fail all the subpartitionReaders (A), which
- // consequently fail all the downstream tasks, unregister their other
- // subpartitionReaders (B) and release the read buffers.
- throw new TimeoutException(
- String.format(
- "Buffer request timeout, this means there is a fierce contention of"
- + " the batch shuffle read memory, please increase '%s'.",
- TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
- }
- return new ArrayDeque<>();
+ // This is a safe net against potential deadlocks.
+ //
+ // A deadlock can happen when the downstream task needs to consume multiple result
+ // partitions (e.g., A and B) in specific order (cannot consume B before finishing
+ // consuming A). Since the reading buffer pool is shared across the TM, if B happens to
+ // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
+ // B from being consumed and releasing the buffers.
+ //
+ // The imperfect solution here is to fail all the subpartitionReaders (A), which
+ // consequently fail all the downstream tasks, unregister their other
+ // subpartitionReaders (B) and release the read buffers.
+ throw new TimeoutException(
+ String.format(
+ "Buffer request timeout, this means there is a fierce contention of"
+ + " the batch shuffle read memory, please increase '%s'.",
+ TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
}
private void mayTriggerReading() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 546f29d..0d1613e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -39,7 +39,6 @@
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
@@ -261,17 +260,23 @@
@Test
void testRequestBufferTimeout() throws Exception {
Duration bufferRequestTimeout = Duration.ofSeconds(3);
- List<MemorySegment> buffers = bufferPool.requestBuffers();
+ // avoid auto trigger reading.
+ ManuallyTriggeredScheduledExecutorService executorService =
+ new ManuallyTriggeredScheduledExecutorService();
SortMergeResultPartitionReadScheduler readScheduler =
new SortMergeResultPartitionReadScheduler(
- bufferPool, executor, this, bufferRequestTimeout);
+ bufferPool, executorService, this, bufferRequestTimeout);
+ readScheduler.createSubpartitionReader(
+ new NoOpBufferAvailablityListener(), 0, partitionedFile);
+ // request and use all buffers of buffer pool.
+ readScheduler.run();
+ assertThat(bufferPool.getAvailableBuffers()).isZero();
long startTimestamp = System.nanoTime();
assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
long requestDuration = System.nanoTime() - startTimestamp;
assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
- bufferPool.recycle(buffers);
readScheduler.release();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index f2b6591..bb6b1de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -222,10 +222,6 @@
void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception {
Duration bufferRequestTimeout = Duration.ofSeconds(3);
- // request all buffer first.
- bufferPool.requestBuffers();
- assertThat(bufferPool.getAvailableBuffers()).isZero();
-
fileDataManager =
new HsFileDataManager(
bufferPool,
@@ -244,11 +240,23 @@
CompletableFuture<Throwable> cause = new CompletableFuture<>();
reader.setPrepareForSchedulingRunnable(() -> prepareForSchedulingFinished.complete(null));
reader.setFailConsumer((cause::complete));
+ reader.setReadBuffersConsumer(
+ (requestedBuffers, ignore) -> {
+ assertThat(requestedBuffers).hasSize(bufferPool.getNumTotalBuffers());
+ // discard all buffers so that they cannot be recycled.
+ requestedBuffers.clear();
+ });
factory.allReaders.add(reader);
+ // register a new consumer, this will trigger io scheduler run a round.
fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation);
+ // first round run will allocate and use all buffers.
ioExecutor.trigger();
+ assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+ // second round run will trigger timeout.
+ fileDataManager.run();
assertThat(prepareForSchedulingFinished).isCompleted();
assertThat(cause).isCompleted();