[FLINK-30518] Pass JM Pod IP to Flink when starting up with Kubernetes HA enabled
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index d50a9bc..3ecea77 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -20,6 +20,7 @@
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
 import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerBuilder;
@@ -36,6 +37,8 @@
 
     public static final String JOBMANAGER_ENTRYPOINT_ARG = "jobmanager";
     public static final String APPLICATION_MODE_ARG = "standalone-job";
+    public static final String POD_IP_ARG =
+            String.format("$(%s)", Constants.ENV_FLINK_POD_IP_ADDRESS);
 
     private final StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters;
 
@@ -56,10 +59,16 @@
     }
 
     private Container decorateSessionContainer(Container mainContainer) {
-        return new ContainerBuilder(mainContainer)
-                .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
-                .withArgs(JOBMANAGER_ENTRYPOINT_ARG)
-                .build();
+        ContainerBuilder containerBuilder =
+                new ContainerBuilder(mainContainer)
+                        .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
+                        .addToArgs(JOBMANAGER_ENTRYPOINT_ARG);
+
+        if (kubernetesJobManagerParameters.isKubernetesHA()) {
+            containerBuilder.addToArgs(POD_IP_ARG);
+        }
+
+        return containerBuilder.build();
     }
 
     private Container decorateApplicationContainer(Container mainContainer) {
@@ -95,6 +104,11 @@
             args.addAll(kubernetesJobManagerParameters.getJobSpecArgs());
         }
 
+        if (kubernetesJobManagerParameters.isKubernetesHA()) {
+            args.add("--host");
+            args.add(POD_IP_ARG);
+        }
+
         return args;
     }
 }
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index 15a164c..3b887c0 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -20,6 +20,7 @@
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
@@ -38,6 +39,10 @@
  */
 public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManagerParameters {
 
+    private static final String KUBERNETES_HA_FQN_FACTORY_CLASS =
+            "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory";
+    private static final String KUBERNETES_HA_MODE = "KUBERNETES";
+
     public StandaloneKubernetesJobManagerParameters(
             Configuration flinkConfig, ClusterSpecification clusterSpecification) {
         super(flinkConfig, clusterSpecification);
@@ -107,4 +112,10 @@
         }
         return null;
     }
+
+    public boolean isKubernetesHA() {
+        String haMode = flinkConfig.getValue(HighAvailabilityOptions.HA_MODE);
+        return haMode.equals(KUBERNETES_HA_FQN_FACTORY_CLASS)
+                || haMode.equalsIgnoreCase(KUBERNETES_HA_MODE);
+    }
 }
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
index 6ffcd80..599893b 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
@@ -20,6 +20,7 @@
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
@@ -105,4 +106,43 @@
                 decoratedPod.getMainContainer().getArgs(),
                 containsInAnyOrder(CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG));
     }
+
+    @Test
+    public void testSessionKubernetesHAArgsAdded() {
+        configuration.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+
+        configuration.set(
+                HighAvailabilityOptions.HA_MODE,
+                "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory");
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build());
+
+        assertThat(
+                decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH));
+        assertThat(
+                decoratedPod.getMainContainer().getArgs(),
+                containsInAnyOrder(
+                        CmdStandaloneJobManagerDecorator.JOBMANAGER_ENTRYPOINT_ARG,
+                        CmdStandaloneJobManagerDecorator.POD_IP_ARG));
+    }
+
+    @Test
+    public void testApplicationKubernetesHAArgsAdded() {
+        configuration.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+
+        configuration.set(HighAvailabilityOptions.HA_MODE, "KUBERNETES");
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build());
+
+        assertThat(
+                decoratedPod.getMainContainer().getArgs(),
+                containsInAnyOrder(
+                        CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG,
+                        "--host",
+                        CmdStandaloneJobManagerDecorator.POD_IP_ARG));
+    }
 }