[CELEBORN-2193] Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1

### What changes were proposed in this pull request?

Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1.

### Why are the changes needed?

Flink has released v2.0.1 and v2.1.1, which release notes refer to:

- [Apache Flink 2.0.1 Release](https://github.com/apache/flink/releases/tag/release-2.0.1)
- [Apache Flink 2.1.1 Release](https://github.com/apache/flink/releases/tag/release-2.1.1)

Flink v2.0.1 adds the `getConsumedPartitionType()` interface into `IndexedInputGate`, which refers to https://github.com/apache/flink/pull/26548.

`HybridShuffleWordCountTest` could execute with parallelism in https://github.com/apache/flink/pull/26369 which has released in v2.0.1 and v2.1.1.

### Does this PR resolve a correctness bug?

No.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3526 from SteNicholas/CELEBORN-2193.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
diff --git a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index bd60bbe..b8e1994 100644
--- a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -61,6 +61,9 @@
 /** A {@link IndexedInputGate} which ingest data from remote shuffle workers. */
 public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {
 
+  /** The type of the partition the input gate is consuming. */
+  private final ResultPartitionType consumedPartitionType;
+
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
       ShuffleIOOwnerContext ownerContext,
@@ -79,6 +82,7 @@
         bufferDecompressor,
         numConcurrentReading,
         shuffleIOMetricGroups);
+    this.consumedPartitionType = gateDescriptor.getConsumedPartitionType();
   }
 
   @Override
@@ -93,6 +97,12 @@
     return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
   }
 
+  public ResultPartitionType getConsumedPartitionType() {
+    // Flink 1.19.3
+    // [FLINK-37783] Auto-disable buffer debloating for tiered shuffle
+    return consumedPartitionType;
+  }
+
   /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
   private class FakedRemoteInputChannel extends RemoteInputChannel {
     FakedRemoteInputChannel(int channelIndex) {
diff --git a/pom.xml b/pom.xml
index 4675dd3..816393f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1833,7 +1833,7 @@
         <module>tests/flink-it</module>
       </modules>
       <properties>
-        <flink.version>2.0.0</flink.version>
+        <flink.version>2.0.1</flink.version>
         <flink.binary.version>2.0</flink.binary.version>
         <scala.binary.version>2.12</scala.binary.version>
         <celeborn.flink.plugin.artifact>celeborn-client-flink-2.0_${scala.binary.version}</celeborn.flink.plugin.artifact>
@@ -1852,7 +1852,7 @@
         <module>tests/flink-it</module>
       </modules>
       <properties>
-        <flink.version>2.1.0</flink.version>
+        <flink.version>2.1.1</flink.version>
         <flink.binary.version>2.1</flink.binary.version>
         <scala.binary.version>2.12</scala.binary.version>
         <celeborn.flink.plugin.artifact>celeborn-client-flink-2.1_${scala.binary.version}</celeborn.flink.plugin.artifact>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 84ee69c..f12adff 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -1202,7 +1202,7 @@
 }
 
 object Flink20 extends FlinkClientProjects {
-  val flinkVersion = "2.0.0"
+  val flinkVersion = "2.0.1"
 
   // note that SBT does not allow using the period symbol (.) in project names.
   val flinkClientProjectPath = "client-flink/flink-2.0"
@@ -1212,7 +1212,7 @@
 }
 
 object Flink21 extends FlinkClientProjects {
-  val flinkVersion = "2.1.0"
+  val flinkVersion = "2.1.1"
 
   // note that SBT does not allow using the period symbol (.) in project names.
   val flinkClientProjectPath = "client-flink/flink-2.1"
@@ -1242,6 +1242,8 @@
     .aggregate(flinkCommon, flinkClient, flinkIt)
 
   // get flink major version. e.g:
+  //   2.0.1 -> 2.0
+  //   2.1.1 -> 2.1
   //   1.20.3 -> 1.20
   //   1.19.3 -> 1.19
   //   1.18.1 -> 1.18
diff --git a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
index 29bd3ce..1bfa247 100644
--- a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
+++ b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
@@ -93,6 +93,9 @@
       "execution.batch-shuffle-mode",
       "ALL_EXCHANGES_HYBRID_FULL")
     configuration.setString("taskmanager.memory.network.min", "1024m")
+    configuration.setString(
+      "execution.batch.adaptive.auto-parallelism.min-parallelism",
+      "" + parallelism)
     configuration.setString("restart-strategy.type", "fixed-delay")
     configuration.setString("restart-strategy.fixed-delay.attempts", "50")
     configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -105,8 +108,7 @@
     env.getConfig.setParallelism(parallelism)
     env.disableOperatorChaining()
     // make parameters available in the web interface
-    // TODO: WordCountHelper should execute with parallelism for [FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in AllToAllBlockingResultInfo when submitting a job graph.
-    WordCountHelper.execute(env, 1)
+    WordCountHelper.execute(env, parallelism)
 
     val graph = env.getStreamGraph
     env.execute(graph)
@@ -129,6 +131,9 @@
       "execution.batch-shuffle-mode",
       "ALL_EXCHANGES_HYBRID_FULL")
     configuration.setString("taskmanager.memory.network.min", "256m")
+    configuration.setString(
+      "execution.batch.adaptive.auto-parallelism.min-parallelism",
+      "" + parallelism)
     configuration.setString("restart-strategy.type", "fixed-delay")
     configuration.setString("restart-strategy.fixed-delay.attempts", "50")
     configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -140,8 +145,7 @@
     env.getConfig.setParallelism(parallelism)
     env.disableOperatorChaining()
     // make parameters available in the web interface
-    // TODO: WordCountHelper should execute with parallelism for [FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in AllToAllBlockingResultInfo when submitting a job graph.
-    WordCountHelper.execute(env, 1)
+    WordCountHelper.execute(env, parallelism)
 
     val graph = env.getStreamGraph
     graph.setJobType(JobType.BATCH)