[FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
index d031d72..abc6c95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
@@ -67,6 +67,8 @@
 
     private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 * 1024L;
 
+    private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 1024;
+
     private final String remoteStorageBasePath;
 
     private final int tieredStorageBufferSize;
@@ -330,6 +332,8 @@
 
         private long numRetainedInMemoryRegionsMax = DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY;
 
+        private int maxCachedBytesBeforeFlush = DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH;
+
         private List<TierFactory> tierFactories;
 
         private List<Integer> tierExclusiveBuffers;
@@ -416,6 +420,11 @@
             return this;
         }
 
+        public Builder setMaxCachedBytesBeforeFlush(int maxCachedBytesBeforeFlush) {
+            this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
+            return this;
+        }
+
         public TieredStorageConfiguration build() {
             setupTierFactoriesAndExclusiveBuffers();
             return new TieredStorageConfiguration(
@@ -451,6 +460,7 @@
                             tieredStorageBufferSize,
                             minReserveDiskSpaceFraction,
                             regionGroupSizeInBytes,
+                            maxCachedBytesBeforeFlush,
                             numRetainedInMemoryRegionsMax));
             tierExclusiveBuffers.add(diskTierExclusiveBuffers);
             if (remoteStorageBasePath != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
index f5d7f30..feb3a1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
@@ -41,6 +41,8 @@
 
     private final int numSubpartitions;
 
+    private final int maxCachedBytesBeforeFlush;
+
     private final PartitionFileWriter partitionFileWriter;
 
     private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
@@ -48,13 +50,21 @@
     /** Whether the current flush process has completed. */
     private CompletableFuture<Void> hasFlushCompleted;
 
+    /**
+     * The number of all subpartition's cached bytes in the cache manager. Note that the counter can
+     * only be accessed by the task thread and does not require locks.
+     */
+    private int numCachedBytesCounter;
+
     DiskCacheManager(
             TieredStoragePartitionId partitionId,
             int numSubpartitions,
+            int maxCachedBytesBeforeFlush,
             TieredStorageMemoryManager memoryManager,
             PartitionFileWriter partitionFileWriter) {
         this.partitionId = partitionId;
         this.numSubpartitions = numSubpartitions;
+        this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
         this.partitionFileWriter = partitionFileWriter;
         this.subpartitionCacheManagers = new SubpartitionDiskCacheManager[numSubpartitions];
         this.hasFlushCompleted = FutureUtils.completedVoidFuture();
@@ -81,6 +91,7 @@
      */
     void append(Buffer buffer, int subpartitionId) {
         subpartitionCacheManagers[subpartitionId].append(buffer);
+        increaseNumCachedBytesAndCheckFlush(buffer.readableBytes());
     }
 
     /**
@@ -92,12 +103,7 @@
      */
     void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId) {
         subpartitionCacheManagers[subpartitionId].appendEndOfSegmentEvent(record);
-
-        // When finishing a segment, the buffers should be flushed because the next segment may be
-        // written to another tier. If the buffers in this tier are not flushed here, then the next
-        // segment in another tier may be stuck by lacking buffers. This flush has a low trigger
-        // frequency, so its impact on performance is relatively small.
-        forceFlushCachedBuffers();
+        increaseNumCachedBytesAndCheckFlush(record.remaining());
     }
 
     /**
@@ -127,6 +133,13 @@
     //  Internal Methods
     // ------------------------------------------------------------------------
 
+    private void increaseNumCachedBytesAndCheckFlush(int numIncreasedCachedBytes) {
+        numCachedBytesCounter += numIncreasedCachedBytes;
+        if (numCachedBytesCounter > maxCachedBytesBeforeFlush) {
+            forceFlushCachedBuffers();
+        }
+    }
+
     private void notifyFlushCachedBuffers() {
         flushBuffers(false);
     }
@@ -153,6 +166,7 @@
                 hasFlushCompleted = flushCompletableFuture;
             }
         }
+        numCachedBytesCounter = 0;
     }
 
     private int getSubpartitionToFlushBuffers(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index fad6677..4806e23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -54,6 +54,8 @@
 
     private final int regionGroupSizeInBytes;
 
+    private final int maxCachedBytesBeforeFlush;
+
     private final long numRetainedInMemoryRegionsMax;
 
     public DiskTierFactory(
@@ -61,11 +63,13 @@
             int bufferSizeBytes,
             float minReservedDiskSpaceFraction,
             int regionGroupSizeInBytes,
+            int maxCachedBytesBeforeFlush,
             long numRetainedInMemoryRegionsMax) {
         this.numBytesPerSegment = numBytesPerSegment;
         this.bufferSizeBytes = bufferSizeBytes;
         this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction;
         this.regionGroupSizeInBytes = regionGroupSizeInBytes;
+        this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
         this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
     }
 
@@ -106,6 +110,7 @@
                 numSubpartitions,
                 numBytesPerSegment,
                 bufferSizeBytes,
+                maxCachedBytesBeforeFlush,
                 dataFilePath,
                 minReservedDiskSpaceFraction,
                 isBroadcastOnly,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index cf91eca..acd8c3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -86,6 +86,7 @@
             int numSubpartitions,
             int numBytesPerSegment,
             int bufferSizeBytes,
+            int maxCachedBytesBeforeFlush,
             Path dataFilePath,
             float minReservedDiskSpaceFraction,
             boolean isBroadcastOnly,
@@ -122,6 +123,7 @@
                 new DiskCacheManager(
                         partitionId,
                         isBroadcastOnly ? 1 : numSubpartitions,
+                        maxCachedBytesBeforeFlush,
                         memoryManager,
                         partitionFileWriter);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
index 7297d89..b07c550 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
@@ -73,6 +73,7 @@
                 new DiskCacheManager(
                         TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
                         1,
+                        1024,
                         memoryManager,
                         partitionFileWriter);
 
@@ -110,11 +111,13 @@
                 new DiskCacheManager(
                         TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
                         1,
+                        1024,
                         memoryManager,
                         partitionFileWriter);
         diskCacheManager.appendEndOfSegmentEvent(
                 EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0);
 
+        diskCacheManager.close();
         assertThat(receivedBuffers).hasSize(1);
         List<PartitionFileWriter.SegmentBufferContext> segmentBufferContexts =
                 receivedBuffers.get(0).getSegmentBufferContexts();
@@ -132,6 +135,39 @@
     }
 
     @Test
+    void testFlushWhenCachedBytesReachLimit() throws IOException {
+        TestingTieredStorageMemoryManager memoryManager =
+                new TestingTieredStorageMemoryManager.Builder().build();
+
+        AtomicInteger numWriteTimes = new AtomicInteger(0);
+        TestingPartitionFileWriter partitionFileWriter =
+                new TestingPartitionFileWriter.Builder()
+                        .setWriteFunction(
+                                (partitionId, subpartitionBufferContexts) -> {
+                                    numWriteTimes.incrementAndGet();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+        DiskCacheManager diskCacheManager =
+                new DiskCacheManager(
+                        TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
+                        1,
+                        1024,
+                        memoryManager,
+                        partitionFileWriter);
+        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0);
+        assertThat(numWriteTimes).hasValue(0);
+        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1), 0);
+        assertThat(numWriteTimes).hasValue(1);
+
+        diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0);
+        assertThat(numWriteTimes).hasValue(1);
+        diskCacheManager.appendEndOfSegmentEvent(
+                EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0);
+        assertThat(numWriteTimes).hasValue(2);
+    }
+
+    @Test
     void testRelease() {
         AtomicBoolean isReleased = new AtomicBoolean(false);
         TestingTieredStorageMemoryManager memoryManager =
@@ -144,6 +180,7 @@
                 new DiskCacheManager(
                         TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
                         1,
+                        1024,
                         memoryManager,
                         partitionFileWriter);
         diskCacheManager.release();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 3daf34c..58c09ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -212,6 +212,7 @@
                 NUM_SUBPARTITIONS,
                 numBytesPerSegment,
                 BUFFER_SIZE_BYTES,
+                BUFFER_SIZE_BYTES,
                 dataFilePath,
                 minReservedDiskSpaceFraction,
                 isBroadcastOnly,