[FLINK-30518] Pass JM Pod IP to Flink when starting up with Kubernetes HA enabled
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index d50a9bc..3ecea77 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -20,6 +20,7 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.decorators.AbstractKubernetesStepDecorator;
import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
@@ -36,6 +37,8 @@
public static final String JOBMANAGER_ENTRYPOINT_ARG = "jobmanager";
public static final String APPLICATION_MODE_ARG = "standalone-job";
+ public static final String POD_IP_ARG =
+ String.format("$(%s)", Constants.ENV_FLINK_POD_IP_ADDRESS);
private final StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters;
@@ -56,10 +59,16 @@
}
private Container decorateSessionContainer(Container mainContainer) {
- return new ContainerBuilder(mainContainer)
- .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
- .withArgs(JOBMANAGER_ENTRYPOINT_ARG)
- .build();
+ ContainerBuilder containerBuilder =
+ new ContainerBuilder(mainContainer)
+ .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
+ .addToArgs(JOBMANAGER_ENTRYPOINT_ARG);
+
+ if (kubernetesJobManagerParameters.isKubernetesHA()) {
+ containerBuilder.addToArgs(POD_IP_ARG);
+ }
+
+ return containerBuilder.build();
}
private Container decorateApplicationContainer(Container mainContainer) {
@@ -95,6 +104,11 @@
args.addAll(kubernetesJobManagerParameters.getJobSpecArgs());
}
+ if (kubernetesJobManagerParameters.isKubernetesHA()) {
+ args.add("--host");
+ args.add(POD_IP_ARG);
+ }
+
return args;
}
}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index 15a164c..3b887c0 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -20,6 +20,7 @@
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
@@ -38,6 +39,10 @@
*/
public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManagerParameters {
+ private static final String KUBERNETES_HA_FQN_FACTORY_CLASS =
+ "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory";
+ private static final String KUBERNETES_HA_MODE = "KUBERNETES";
+
public StandaloneKubernetesJobManagerParameters(
Configuration flinkConfig, ClusterSpecification clusterSpecification) {
super(flinkConfig, clusterSpecification);
@@ -107,4 +112,10 @@
}
return null;
}
+
+ public boolean isKubernetesHA() {
+ String haMode = flinkConfig.getValue(HighAvailabilityOptions.HA_MODE);
+ return haMode.equals(KUBERNETES_HA_FQN_FACTORY_CLASS)
+ || haMode.equalsIgnoreCase(KUBERNETES_HA_MODE);
+ }
}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
index 6ffcd80..599893b 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
@@ -20,6 +20,7 @@
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
@@ -105,4 +106,43 @@
decoratedPod.getMainContainer().getArgs(),
containsInAnyOrder(CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG));
}
+
+ @Test
+ public void testSessionKubernetesHAArgsAdded() {
+ configuration.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+
+ configuration.set(
+ HighAvailabilityOptions.HA_MODE,
+ "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory");
+
+ FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build());
+
+ assertThat(
+ decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH));
+ assertThat(
+ decoratedPod.getMainContainer().getArgs(),
+ containsInAnyOrder(
+ CmdStandaloneJobManagerDecorator.JOBMANAGER_ENTRYPOINT_ARG,
+ CmdStandaloneJobManagerDecorator.POD_IP_ARG));
+ }
+
+ @Test
+ public void testApplicationKubernetesHAArgsAdded() {
+ configuration.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+
+ configuration.set(HighAvailabilityOptions.HA_MODE, "KUBERNETES");
+
+ FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build());
+
+ assertThat(
+ decoratedPod.getMainContainer().getArgs(),
+ containsInAnyOrder(
+ CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG,
+ "--host",
+ CmdStandaloneJobManagerDecorator.POD_IP_ARG));
+ }
}