[FLINK-31346][runtime] IO scheduler does not throw TimeoutException if numRequestedBuffers is greater than 0.

This will cause deadlock as read buffer was occupied by a reader with low consumption priority.

This closes #22122

(cherry picked from commit 5ad2ae2c24ade2655981f609298978d26329466f)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 0f38aca..72ef41d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -221,14 +221,22 @@
         } while (System.nanoTime() < timeoutTime
                 || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime()));
 
-        if (numRequestedBuffers <= 0) {
-            throw new TimeoutException(
-                    String.format(
-                            "Buffer request timeout, this means there is a fierce contention of"
-                                    + " the batch shuffle read memory, please increase '%s'.",
-                            TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
-        }
-        return new ArrayDeque<>();
+        // This is a safe net against potential deadlocks.
+        //
+        // A deadlock can happen when the downstream task needs to consume multiple result
+        // partitions (e.g., A and B) in specific order (cannot consume B before finishing
+        // consuming A). Since the reading buffer pool is shared across the TM, if B happens to
+        // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
+        // B from being consumed and releasing the buffers.
+        //
+        // The imperfect solution here is to fail all the subpartitionReaders (A), which
+        // consequently fail all the downstream tasks, unregister their other
+        // subpartitionReaders (B) and release the read buffers.
+        throw new TimeoutException(
+                String.format(
+                        "Buffer request timeout, this means there is a fierce contention of"
+                                + " the batch shuffle read memory, please increase '%s'.",
+                        TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
     }
 
     private long getBufferRequestTimeoutTime() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 14ac2c8..5e595f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -260,25 +260,22 @@
         } while (System.nanoTime() < timeoutTime
                 || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime()));
 
-        if (numRequestedBuffers <= 0) {
-            // This is a safe net against potential deadlocks.
-            //
-            // A deadlock can happen when the downstream task needs to consume multiple result
-            // partitions (e.g., A and B) in specific order (cannot consume B before finishing
-            // consuming A). Since the reading buffer pool is shared across the TM, if B happens to
-            // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
-            // B from being consumed and releasing the buffers.
-            //
-            // The imperfect solution here is to fail all the subpartitionReaders (A), which
-            // consequently fail all the downstream tasks, unregister their other
-            // subpartitionReaders (B) and release the read buffers.
-            throw new TimeoutException(
-                    String.format(
-                            "Buffer request timeout, this means there is a fierce contention of"
-                                    + " the batch shuffle read memory, please increase '%s'.",
-                            TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
-        }
-        return new ArrayDeque<>();
+        // This is a safe net against potential deadlocks.
+        //
+        // A deadlock can happen when the downstream task needs to consume multiple result
+        // partitions (e.g., A and B) in specific order (cannot consume B before finishing
+        // consuming A). Since the reading buffer pool is shared across the TM, if B happens to
+        // take all the buffers, A cannot be consumed due to lack of buffers, which also blocks
+        // B from being consumed and releasing the buffers.
+        //
+        // The imperfect solution here is to fail all the subpartitionReaders (A), which
+        // consequently fail all the downstream tasks, unregister their other
+        // subpartitionReaders (B) and release the read buffers.
+        throw new TimeoutException(
+                String.format(
+                        "Buffer request timeout, this means there is a fierce contention of"
+                                + " the batch shuffle read memory, please increase '%s'.",
+                        TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
     }
 
     private void mayTriggerReading() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 546f29d..0d1613e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -39,7 +39,6 @@
 import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -261,17 +260,23 @@
     @Test
     void testRequestBufferTimeout() throws Exception {
         Duration bufferRequestTimeout = Duration.ofSeconds(3);
-        List<MemorySegment> buffers = bufferPool.requestBuffers();
+        // avoid auto trigger reading.
+        ManuallyTriggeredScheduledExecutorService executorService =
+                new ManuallyTriggeredScheduledExecutorService();
         SortMergeResultPartitionReadScheduler readScheduler =
                 new SortMergeResultPartitionReadScheduler(
-                        bufferPool, executor, this, bufferRequestTimeout);
+                        bufferPool, executorService, this, bufferRequestTimeout);
+        readScheduler.createSubpartitionReader(
+                new NoOpBufferAvailablityListener(), 0, partitionedFile);
+        // request and use all buffers of buffer pool.
+        readScheduler.run();
 
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
         long startTimestamp = System.nanoTime();
         assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
         long requestDuration = System.nanoTime() - startTimestamp;
         assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
 
-        bufferPool.recycle(buffers);
         readScheduler.release();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index f2b6591..bb6b1de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -222,10 +222,6 @@
     void testRunRequestBufferTimeout(@TempDir Path tempDir) throws Exception {
         Duration bufferRequestTimeout = Duration.ofSeconds(3);
 
-        // request all buffer first.
-        bufferPool.requestBuffers();
-        assertThat(bufferPool.getAvailableBuffers()).isZero();
-
         fileDataManager =
                 new HsFileDataManager(
                         bufferPool,
@@ -244,11 +240,23 @@
         CompletableFuture<Throwable> cause = new CompletableFuture<>();
         reader.setPrepareForSchedulingRunnable(() -> prepareForSchedulingFinished.complete(null));
         reader.setFailConsumer((cause::complete));
+        reader.setReadBuffersConsumer(
+                (requestedBuffers, ignore) -> {
+                    assertThat(requestedBuffers).hasSize(bufferPool.getNumTotalBuffers());
+                    // discard all buffers so that they cannot be recycled.
+                    requestedBuffers.clear();
+                });
         factory.allReaders.add(reader);
 
+        // register a new consumer, this will trigger io scheduler run a round.
         fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation);
 
+        // first round run will allocate and use all buffers.
         ioExecutor.trigger();
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        // second round run will trigger timeout.
+        fileDataManager.run();
 
         assertThat(prepareForSchedulingFinished).isCompleted();
         assertThat(cause).isCompleted();