[FLINK-20866][yarn] Set high-availability.cluster-id to application id if not configured

In order to easily connect to an HA enabled Yarn cluster, this commit sets the
high-availability.cluster-id to the application id if not configured. This is
symmetric to how we deploy HA enabled clusters and makes it unnecessary to
explicitly set the high-availability.cluster-id if one tries to reconnect
to the cluster.

This closes #14577.
diff --git a/docs/deployment/resource-providers/yarn.md b/docs/deployment/resource-providers/yarn.md
index 759d203..3d8837d 100644
--- a/docs/deployment/resource-providers/yarn.md
+++ b/docs/deployment/resource-providers/yarn.md
@@ -194,7 +194,11 @@
 
 YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.
 
-Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
+Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
+Flink sets it per default to the YARN application id.
+**You should not overwrite this parameter when deploying an HA cluster on YARN**. 
+The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). 
+Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
 
 #### Container Shutdown Behaviour
 
diff --git a/docs/deployment/resource-providers/yarn.zh.md b/docs/deployment/resource-providers/yarn.zh.md
index 0cb589b..72e4505 100644
--- a/docs/deployment/resource-providers/yarn.zh.md
+++ b/docs/deployment/resource-providers/yarn.zh.md
@@ -194,7 +194,11 @@
 
 YARN is taking care of restarting failed JobManagers. The maximum number of JobManager restarts is defined through two configuration parameters. First Flink's [yarn.application-attempts]({% link deployment/config.zh.md %}#yarn-application-attempts) configuration will default 2. This value is limited by YARN's [yarn.resourcemanager.am.max-attempts](https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml), which also defaults to 2.
 
-Note that Flink is managing the `high-availability.cluster-id` configuration parameter when running on YARN. **You should not overwrite this parameter when running an HA cluster on YARN**. The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper). Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
+Note that Flink is managing the `high-availability.cluster-id` configuration parameter when deploying on YARN.
+Flink sets it per default to the YARN application id.
+**You should not overwrite this parameter when deploying an HA cluster on YARN**.
+The cluster ID is used to distinguish multiple HA clusters in the HA backend (for example Zookeeper).
+Overwriting this configuration parameter can lead to multiple YARN clusters affecting each other.
 
 #### Container Shutdown Behaviour
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 23a8aa2..eeed605 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -87,6 +87,7 @@
 import java.util.function.Predicate;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -209,6 +210,41 @@
                 });
     }
 
+    /**
+     * Tests that we can retrieve an HA enabled cluster by only specifying the application id if no
+     * other high-availability.cluster-id has been configured. See FLINK-20866.
+     */
+    @Test
+    public void testClusterClientRetrieval() throws Exception {
+        runTest(
+                () -> {
+                    final YarnClusterDescriptor yarnClusterDescriptor =
+                            setupYarnClusterDescriptor();
+                    final RestClusterClient<ApplicationId> restClusterClient =
+                            deploySessionCluster(yarnClusterDescriptor);
+
+                    ClusterClient<ApplicationId> newClusterClient = null;
+                    try {
+                        final ApplicationId clusterId = restClusterClient.getClusterId();
+
+                        final YarnClusterDescriptor newClusterDescriptor =
+                                setupYarnClusterDescriptor();
+                        newClusterClient =
+                                newClusterDescriptor.retrieve(clusterId).getClusterClient();
+
+                        assertThat(newClusterClient.listJobs().join(), is(empty()));
+
+                        newClusterClient.shutDownCluster();
+                    } finally {
+                        restClusterClient.close();
+
+                        if (newClusterClient != null) {
+                            newClusterClient.close();
+                        }
+                    }
+                });
+    }
+
     private void waitForApplicationAttempt(final ApplicationId applicationId, final int attemptId)
             throws Exception {
         final YarnClient yarnClient = getYarnClient();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index f07f8e1..e2b373a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1767,11 +1767,11 @@
     private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
         checkNotNull(report);
 
-        final ApplicationId clusterId = report.getApplicationId();
+        final ApplicationId appId = report.getApplicationId();
         final String host = report.getHost();
         final int port = report.getRpcPort();
 
-        LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, clusterId);
+        LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, appId);
 
         flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
         flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
@@ -1779,8 +1779,13 @@
         flinkConfiguration.setString(RestOptions.ADDRESS, host);
         flinkConfiguration.setInteger(RestOptions.PORT, port);
 
-        flinkConfiguration.set(
-                YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(clusterId));
+        flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));
+
+        // set cluster-id to app id if not specified
+        if (!flinkConfiguration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
+            flinkConfiguration.set(
+                    HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
+        }
     }
 
     public static void logDetachedClusterInformation(