[FLINK-33044][network] Reduce the frequency of triggering flush for the disk tier of tiered storage
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
index d031d72..abc6c95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
@@ -67,6 +67,8 @@
private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 * 1024L;
+ private static final int DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH = 512 * 1024;
+
private final String remoteStorageBasePath;
private final int tieredStorageBufferSize;
@@ -330,6 +332,8 @@
private long numRetainedInMemoryRegionsMax = DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY;
+ private int maxCachedBytesBeforeFlush = DEFAULT_MAX_CACHED_BYTES_BEFORE_FLUSH;
+
private List<TierFactory> tierFactories;
private List<Integer> tierExclusiveBuffers;
@@ -416,6 +420,11 @@
return this;
}
+ public Builder setMaxCachedBytesBeforeFlush(int maxCachedBytesBeforeFlush) {
+ this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
+ return this;
+ }
+
public TieredStorageConfiguration build() {
setupTierFactoriesAndExclusiveBuffers();
return new TieredStorageConfiguration(
@@ -451,6 +460,7 @@
tieredStorageBufferSize,
minReserveDiskSpaceFraction,
regionGroupSizeInBytes,
+ maxCachedBytesBeforeFlush,
numRetainedInMemoryRegionsMax));
tierExclusiveBuffers.add(diskTierExclusiveBuffers);
if (remoteStorageBasePath != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
index f5d7f30..feb3a1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java
@@ -41,6 +41,8 @@
private final int numSubpartitions;
+ private final int maxCachedBytesBeforeFlush;
+
private final PartitionFileWriter partitionFileWriter;
private final SubpartitionDiskCacheManager[] subpartitionCacheManagers;
@@ -48,13 +50,21 @@
/** Whether the current flush process has completed. */
private CompletableFuture<Void> hasFlushCompleted;
+ /**
+ * The number of all subpartition's cached bytes in the cache manager. Note that the counter can
+ * only be accessed by the task thread and does not require locks.
+ */
+ private int numCachedBytesCounter;
+
DiskCacheManager(
TieredStoragePartitionId partitionId,
int numSubpartitions,
+ int maxCachedBytesBeforeFlush,
TieredStorageMemoryManager memoryManager,
PartitionFileWriter partitionFileWriter) {
this.partitionId = partitionId;
this.numSubpartitions = numSubpartitions;
+ this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
this.partitionFileWriter = partitionFileWriter;
this.subpartitionCacheManagers = new SubpartitionDiskCacheManager[numSubpartitions];
this.hasFlushCompleted = FutureUtils.completedVoidFuture();
@@ -81,6 +91,7 @@
*/
void append(Buffer buffer, int subpartitionId) {
subpartitionCacheManagers[subpartitionId].append(buffer);
+ increaseNumCachedBytesAndCheckFlush(buffer.readableBytes());
}
/**
@@ -92,12 +103,7 @@
*/
void appendEndOfSegmentEvent(ByteBuffer record, int subpartitionId) {
subpartitionCacheManagers[subpartitionId].appendEndOfSegmentEvent(record);
-
- // When finishing a segment, the buffers should be flushed because the next segment may be
- // written to another tier. If the buffers in this tier are not flushed here, then the next
- // segment in another tier may be stuck by lacking buffers. This flush has a low trigger
- // frequency, so its impact on performance is relatively small.
- forceFlushCachedBuffers();
+ increaseNumCachedBytesAndCheckFlush(record.remaining());
}
/**
@@ -127,6 +133,13 @@
// Internal Methods
// ------------------------------------------------------------------------
+ private void increaseNumCachedBytesAndCheckFlush(int numIncreasedCachedBytes) {
+ numCachedBytesCounter += numIncreasedCachedBytes;
+ if (numCachedBytesCounter > maxCachedBytesBeforeFlush) {
+ forceFlushCachedBuffers();
+ }
+ }
+
private void notifyFlushCachedBuffers() {
flushBuffers(false);
}
@@ -153,6 +166,7 @@
hasFlushCompleted = flushCompletableFuture;
}
}
+ numCachedBytesCounter = 0;
}
private int getSubpartitionToFlushBuffers(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index fad6677..4806e23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -54,6 +54,8 @@
private final int regionGroupSizeInBytes;
+ private final int maxCachedBytesBeforeFlush;
+
private final long numRetainedInMemoryRegionsMax;
public DiskTierFactory(
@@ -61,11 +63,13 @@
int bufferSizeBytes,
float minReservedDiskSpaceFraction,
int regionGroupSizeInBytes,
+ int maxCachedBytesBeforeFlush,
long numRetainedInMemoryRegionsMax) {
this.numBytesPerSegment = numBytesPerSegment;
this.bufferSizeBytes = bufferSizeBytes;
this.minReservedDiskSpaceFraction = minReservedDiskSpaceFraction;
this.regionGroupSizeInBytes = regionGroupSizeInBytes;
+ this.maxCachedBytesBeforeFlush = maxCachedBytesBeforeFlush;
this.numRetainedInMemoryRegionsMax = numRetainedInMemoryRegionsMax;
}
@@ -106,6 +110,7 @@
numSubpartitions,
numBytesPerSegment,
bufferSizeBytes,
+ maxCachedBytesBeforeFlush,
dataFilePath,
minReservedDiskSpaceFraction,
isBroadcastOnly,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index cf91eca..acd8c3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -86,6 +86,7 @@
int numSubpartitions,
int numBytesPerSegment,
int bufferSizeBytes,
+ int maxCachedBytesBeforeFlush,
Path dataFilePath,
float minReservedDiskSpaceFraction,
boolean isBroadcastOnly,
@@ -122,6 +123,7 @@
new DiskCacheManager(
partitionId,
isBroadcastOnly ? 1 : numSubpartitions,
+ maxCachedBytesBeforeFlush,
memoryManager,
partitionFileWriter);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
index 7297d89..b07c550 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManagerTest.java
@@ -73,6 +73,7 @@
new DiskCacheManager(
TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
1,
+ 1024,
memoryManager,
partitionFileWriter);
@@ -110,11 +111,13 @@
new DiskCacheManager(
TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
1,
+ 1024,
memoryManager,
partitionFileWriter);
diskCacheManager.appendEndOfSegmentEvent(
EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0);
+ diskCacheManager.close();
assertThat(receivedBuffers).hasSize(1);
List<PartitionFileWriter.SegmentBufferContext> segmentBufferContexts =
receivedBuffers.get(0).getSegmentBufferContexts();
@@ -132,6 +135,39 @@
}
@Test
+ void testFlushWhenCachedBytesReachLimit() throws IOException {
+ TestingTieredStorageMemoryManager memoryManager =
+ new TestingTieredStorageMemoryManager.Builder().build();
+
+ AtomicInteger numWriteTimes = new AtomicInteger(0);
+ TestingPartitionFileWriter partitionFileWriter =
+ new TestingPartitionFileWriter.Builder()
+ .setWriteFunction(
+ (partitionId, subpartitionBufferContexts) -> {
+ numWriteTimes.incrementAndGet();
+ return FutureUtils.completedVoidFuture();
+ })
+ .build();
+ DiskCacheManager diskCacheManager =
+ new DiskCacheManager(
+ TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
+ 1,
+ 1024,
+ memoryManager,
+ partitionFileWriter);
+ diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0);
+ assertThat(numWriteTimes).hasValue(0);
+ diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1), 0);
+ assertThat(numWriteTimes).hasValue(1);
+
+ diskCacheManager.append(BufferBuilderTestUtils.buildSomeBuffer(1024), 0);
+ assertThat(numWriteTimes).hasValue(1);
+ diskCacheManager.appendEndOfSegmentEvent(
+ EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE), 0);
+ assertThat(numWriteTimes).hasValue(2);
+ }
+
+ @Test
void testRelease() {
AtomicBoolean isReleased = new AtomicBoolean(false);
TestingTieredStorageMemoryManager memoryManager =
@@ -144,6 +180,7 @@
new DiskCacheManager(
TieredStorageIdMappingUtils.convertId(new ResultPartitionID()),
1,
+ 1024,
memoryManager,
partitionFileWriter);
diskCacheManager.release();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 3daf34c..58c09ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -212,6 +212,7 @@
NUM_SUBPARTITIONS,
numBytesPerSegment,
BUFFER_SIZE_BYTES,
+ BUFFER_SIZE_BYTES,
dataFilePath,
minReservedDiskSpaceFraction,
isBroadcastOnly,