[CELEBORN-2193] Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1
### What changes were proposed in this pull request?
Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1.
### Why are the changes needed?
Flink has released v2.0.1 and v2.1.1, which release notes refer to:
- [Apache Flink 2.0.1 Release](https://github.com/apache/flink/releases/tag/release-2.0.1)
- [Apache Flink 2.1.1 Release](https://github.com/apache/flink/releases/tag/release-2.1.1)
Flink v2.0.1 adds the `getConsumedPartitionType()` interface into `IndexedInputGate`, which refers to https://github.com/apache/flink/pull/26548.
`HybridShuffleWordCountTest` could execute with parallelism in https://github.com/apache/flink/pull/26369 which has released in v2.0.1 and v2.1.1.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3526 from SteNicholas/CELEBORN-2193.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index bd60bbe..b8e1994 100644
--- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -61,6 +61,9 @@
/** A {@link IndexedInputGate} which ingest data from remote shuffle workers. */
public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {
+ /** The type of the partition the input gate is consuming. */
+ private final ResultPartitionType consumedPartitionType;
+
public RemoteShuffleInputGate(
CelebornConf celebornConf,
ShuffleIOOwnerContext ownerContext,
@@ -79,6 +82,7 @@
bufferDecompressor,
numConcurrentReading,
shuffleIOMetricGroups);
+ this.consumedPartitionType = gateDescriptor.getConsumedPartitionType();
}
@Override
@@ -93,6 +97,12 @@
return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
}
+ public ResultPartitionType getConsumedPartitionType() {
+ // Flink 1.19.3
+ // [FLINK-37783] Auto-disable buffer debloating for tiered shuffle
+ return consumedPartitionType;
+ }
+
/** Accommodation for the incompleteness of Flink pluggable shuffle service. */
private class FakedRemoteInputChannel extends RemoteInputChannel {
FakedRemoteInputChannel(int channelIndex) {
diff --git a/pom.xml b/pom.xml
index 4675dd3..816393f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1833,7 +1833,7 @@
<module>tests/flink-it</module>
</modules>
<properties>
- <flink.version>2.0.0</flink.version>
+ <flink.version>2.0.1</flink.version>
<flink.binary.version>2.0</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-2.0_${scala.binary.version}</celeborn.flink.plugin.artifact>
@@ -1852,7 +1852,7 @@
<module>tests/flink-it</module>
</modules>
<properties>
- <flink.version>2.1.0</flink.version>
+ <flink.version>2.1.1</flink.version>
<flink.binary.version>2.1</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-2.1_${scala.binary.version}</celeborn.flink.plugin.artifact>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 84ee69c..f12adff 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -1202,7 +1202,7 @@
}
object Flink20 extends FlinkClientProjects {
- val flinkVersion = "2.0.0"
+ val flinkVersion = "2.0.1"
// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-2.0"
@@ -1212,7 +1212,7 @@
}
object Flink21 extends FlinkClientProjects {
- val flinkVersion = "2.1.0"
+ val flinkVersion = "2.1.1"
// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-2.1"
@@ -1242,6 +1242,8 @@
.aggregate(flinkCommon, flinkClient, flinkIt)
// get flink major version. e.g:
+ // 2.0.1 -> 2.0
+ // 2.1.1 -> 2.1
// 1.20.3 -> 1.20
// 1.19.3 -> 1.19
// 1.18.1 -> 1.18
diff --git a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
index 29bd3ce..1bfa247 100644
--- a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
+++ b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
@@ -93,6 +93,9 @@
"execution.batch-shuffle-mode",
"ALL_EXCHANGES_HYBRID_FULL")
configuration.setString("taskmanager.memory.network.min", "1024m")
+ configuration.setString(
+ "execution.batch.adaptive.auto-parallelism.min-parallelism",
+ "" + parallelism)
configuration.setString("restart-strategy.type", "fixed-delay")
configuration.setString("restart-strategy.fixed-delay.attempts", "50")
configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -105,8 +108,7 @@
env.getConfig.setParallelism(parallelism)
env.disableOperatorChaining()
// make parameters available in the web interface
- // TODO: WordCountHelper should execute with parallelism for [FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in AllToAllBlockingResultInfo when submitting a job graph.
- WordCountHelper.execute(env, 1)
+ WordCountHelper.execute(env, parallelism)
val graph = env.getStreamGraph
env.execute(graph)
@@ -129,6 +131,9 @@
"execution.batch-shuffle-mode",
"ALL_EXCHANGES_HYBRID_FULL")
configuration.setString("taskmanager.memory.network.min", "256m")
+ configuration.setString(
+ "execution.batch.adaptive.auto-parallelism.min-parallelism",
+ "" + parallelism)
configuration.setString("restart-strategy.type", "fixed-delay")
configuration.setString("restart-strategy.fixed-delay.attempts", "50")
configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -140,8 +145,7 @@
env.getConfig.setParallelism(parallelism)
env.disableOperatorChaining()
// make parameters available in the web interface
- // TODO: WordCountHelper should execute with parallelism for [FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in AllToAllBlockingResultInfo when submitting a job graph.
- WordCountHelper.execute(env, 1)
+ WordCountHelper.execute(env, parallelism)
val graph = env.getStreamGraph
graph.setJobType(JobType.BATCH)