[FLINK-33103][network] Hybrid shuffle ITCase supports the new mode
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index e9de7c5..f5c6b0a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -22,60 +22,75 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
/** Tests for hybrid shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
class HybridShuffleITCase extends BatchShuffleITCaseBase {
- @Test
+ @Parameter public boolean enableNewHybridMode;
+
+ @Parameters(name = "enableNewHybridMode={0}")
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ @TestTemplate
void testHybridFullExchanges() throws Exception {
final int numRecordsToSend = 10000;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
- configuration.set(
- NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration = configureHybridOptions(getConfiguration(), false);
JobGraph jobGraph = createJobGraph(numRecordsToSend, false, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
- @Test
+ @TestTemplate
void testHybridSelectiveExchanges() throws Exception {
final int numRecordsToSend = 10000;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE,
- BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
- configuration.set(
- NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration = configureHybridOptions(getConfiguration(), true);
JobGraph jobGraph = createJobGraph(numRecordsToSend, false, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
- @Test
+ @TestTemplate
void testHybridFullExchangesRestart() throws Exception {
final int numRecordsToSend = 10;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
- configuration.set(
- NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration = configureHybridOptions(getConfiguration(), false);
JobGraph jobGraph = createJobGraph(numRecordsToSend, true, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
- @Test
+ @TestTemplate
void testHybridSelectiveExchangesRestart() throws Exception {
final int numRecordsToSend = 10;
- Configuration configuration = getConfiguration();
- configuration.set(
- ExecutionOptions.BATCH_SHUFFLE_MODE,
- BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
- configuration.set(
- NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
+ Configuration configuration = configureHybridOptions(getConfiguration(), true);
JobGraph jobGraph = createJobGraph(numRecordsToSend, true, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
+
+ private Configuration configureHybridOptions(Configuration configuration, boolean isSelective) {
+ BatchShuffleMode shuffleMode =
+ isSelective
+ ? BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE
+ : BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+ configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
+ configuration.set(
+ NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE,
+ enableNewHybridMode);
+
+ if (enableNewHybridMode && isSelective) {
+ // Note that the memory tier of the new mode need more buffers for the selective mode
+ configuration.setString(TaskManagerOptions.NETWORK_MEMORY_MAX.key(), "256m");
+ }
+ return configuration;
+ }
}