[FLINK-20866][yarn] Set high-availability.cluster-id to application id if not configured
In order to easily connect to an HA enabled Yarn cluster, this commit sets the
high-availability.cluster-id to the application id if not configured. This is
symmetric to how we deploy HA enabled clusters and makes it unnecessary to
explicitly set the high-availability.cluster-id if one tries to reconnect
to the cluster.
This closes #14577.
diff --git a/docs/deployment/resource-providers/yarn.md b/docs/deployment/resource-providers/yarn.md
index 759d203..3d8837d 100644
--- a/docs/deployment/resource-providers/yarn.md
+++ b/docs/deployment/resource-providers/yarn.md
@@ -194,7 +194,11 @@
YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.
-Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
+Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
+Flink sets it per default to the YARN application id.
+**You should not overwrite this parameter when deploying an HA cluster on YARN**.
+The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
+Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
#### Container Shutdown Behaviour
diff --git a/docs/deployment/resource-providers/yarn.zh.md b/docs/deployment/resource-providers/yarn.zh.md
index 0cb589b..72e4505 100644
--- a/docs/deployment/resource-providers/yarn.zh.md
+++ b/docs/deployment/resource-providers/yarn.zh.md
@@ -194,7 +194,11 @@
YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.zh.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.
-Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
+Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
+Flink sets it per default to the YARN application id.
+**You should not overwrite this parameter when deploying an HA cluster on YARN**.
+The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
+Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
#### Container Shutdown Behaviour
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 23a8aa2..eeed605 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -87,6 +87,7 @@
import java.util.function.Predicate;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -209,6 +210,41 @@
});
}
+ /**
+ * Tests that we can retrieve an HA enabled cluster by only specifying the application id if no
+ * other high-availability.cluster-id has been configured. See FLINK-20866.
+ */
+ @Test
+ public void testClusterClientRetrieval() throws Exception {
+ runTest(
+ () -> {
+ final YarnClusterDescriptor yarnClusterDescriptor =
+ setupYarnClusterDescriptor();
+ final RestClusterClient<ApplicationId> restClusterClient =
+ deploySessionCluster(yarnClusterDescriptor);
+
+ ClusterClient<ApplicationId> newClusterClient = null;
+ try {
+ final ApplicationId clusterId = restClusterClient.getClusterId();
+
+ final YarnClusterDescriptor newClusterDescriptor =
+ setupYarnClusterDescriptor();
+ newClusterClient =
+ newClusterDescriptor.retrieve(clusterId).getClusterClient();
+
+ assertThat(newClusterClient.listJobs().join(), is(empty()));
+
+ newClusterClient.shutDownCluster();
+ } finally {
+ restClusterClient.close();
+
+ if (newClusterClient != null) {
+ newClusterClient.close();
+ }
+ }
+ });
+ }
+
private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId)
throws Exception {
final YarnClient yarnClient = getYarnClient();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f07f8e1..e2b373a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1767,11 +1767,11 @@
private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
checkNotNull(report);
- final ApplicationId clusterId = report.getApplicationId();
+ final ApplicationId appId = report.getApplicationId();
final String host = report.getHost();
final int port = report.getRpcPort();
- LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, clusterId);
+ LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, appId);
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
@@ -1779,8 +1779,13 @@
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
- flinkConfiguration.set(
- YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(clusterId));
+ flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));
+
+ // set cluster-id to app id if not specified
+ if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
+ flinkConfiguration.set(
+ HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
+ }
}
public static void logDetachedClusterInformation(