blob: 99fccde7c8504397e2c265a53fbd7a6e725cae69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.Toleration;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** General tests for the {@link InitJobManagerDecorator}. */
class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
private static final String SERVICE_ACCOUNT_NAME = "service-test";
private static final List<String> IMAGE_PULL_SECRETS = Arrays.asList("s1", "s2", "s3");
private static final Map<String, String> ANNOTATIONS =
new HashMap<String, String>() {
{
put("a1", "v1");
put("a2", "v2");
}
};
private static final String TOLERATION_STRING =
"key:key1,operator:Equal,value:value1,effect:NoSchedule;"
+ "KEY:key2,operator:Exists,Effect:NoExecute,tolerationSeconds:6000";
private static final List<Toleration> TOLERATION =
Arrays.asList(
new Toleration("NoSchedule", "key1", "Equal", null, "value1"),
new Toleration("NoExecute", "key2", "Exists", 6000L, null));
private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log";
private Pod resultPod;
private Container resultMainContainer;
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
this.flinkConfig.set(
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
this.flinkConfig.set(
KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, ANNOTATIONS);
this.flinkConfig.setString(
KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR);
}
@Override
protected void onSetup() throws Exception {
super.onSetup();
final InitJobManagerDecorator initJobManagerDecorator =
new InitJobManagerDecorator(this.kubernetesJobManagerParameters);
final FlinkPod resultFlinkPod = initJobManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
this.resultPod = resultFlinkPod.getPodWithoutMainContainer();
this.resultMainContainer = resultFlinkPod.getMainContainer();
}
@Test
void testApiVersion() {
assertThat(this.resultPod.getApiVersion()).isEqualTo(Constants.API_VERSION);
}
@Test
void testMainContainerName() {
assertThat(this.resultMainContainer.getName()).isEqualTo(Constants.MAIN_CONTAINER_NAME);
}
@Test
void testMainContainerImage() {
assertThat(this.resultMainContainer.getImage()).isEqualTo(CONTAINER_IMAGE);
assertThat(this.resultMainContainer.getImagePullPolicy())
.isEqualTo(CONTAINER_IMAGE_PULL_POLICY.name());
}
@Test
void testMainContainerResourceRequirements() {
final ResourceRequirements resourceRequirements = this.resultMainContainer.getResources();
final Map<String, Quantity> requests = resourceRequirements.getRequests();
assertThat(requests.get("cpu").getAmount()).isEqualTo(Double.toString(JOB_MANAGER_CPU));
assertThat(requests.get("memory").getAmount())
.isEqualTo(String.valueOf(JOB_MANAGER_MEMORY));
final Map<String, Quantity> limits = resourceRequirements.getLimits();
assertThat(limits.get("cpu").getAmount())
.isEqualTo(Double.toString(JOB_MANAGER_CPU * JOB_MANAGER_CPU_LIMIT_FACTOR));
assertThat(limits.get("memory").getAmount())
.isEqualTo(
Integer.toString(
(int) (JOB_MANAGER_MEMORY * JOB_MANAGER_MEMORY_LIMIT_FACTOR)));
}
@Test
void testMainContainerPorts() {
final List<ContainerPort> expectedContainerPorts =
Arrays.asList(
new ContainerPortBuilder()
.withName(Constants.REST_PORT_NAME)
.withContainerPort(REST_PORT)
.build(),
new ContainerPortBuilder()
.withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
.withContainerPort(RPC_PORT)
.build(),
new ContainerPortBuilder()
.withName(Constants.BLOB_SERVER_PORT_NAME)
.withContainerPort(BLOB_SERVER_PORT)
.build());
assertThat(this.resultMainContainer.getPorts()).isEqualTo(expectedContainerPorts);
}
@Test
void testMainContainerEnv() {
final List<EnvVar> envVars = this.resultMainContainer.getEnv();
final Map<String, String> envs = new HashMap<>();
envVars.forEach(env -> envs.put(env.getName(), env.getValue()));
this.customizedEnvs.forEach((k, v) -> assertThat(v).isEqualTo(envs.get(k)));
assertThat(envVars)
.anyMatch(
env ->
env.getName().equals(Constants.ENV_FLINK_POD_IP_ADDRESS)
&& env.getValueFrom()
.getFieldRef()
.getApiVersion()
.equals(Constants.API_VERSION)
&& env.getValueFrom()
.getFieldRef()
.getFieldPath()
.equals(Constants.POD_IP_FIELD_PATH));
}
@Test
void testPodLabels() {
final Map<String, String> expectedLabels = new HashMap<>(getCommonLabels());
expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
expectedLabels.putAll(userLabels);
assertThat(this.resultPod.getMetadata().getLabels()).isEqualTo(expectedLabels);
}
@Test
void testPodAnnotations() {
final Map<String, String> resultAnnotations =
kubernetesJobManagerParameters.getAnnotations();
assertThat(resultAnnotations).isEqualTo(ANNOTATIONS);
}
@Test
void testPodServiceAccountName() {
assertThat(this.resultPod.getSpec().getServiceAccountName())
.isEqualTo(SERVICE_ACCOUNT_NAME);
}
@Test
void testImagePullSecrets() {
final List<String> resultSecrets =
this.resultPod.getSpec().getImagePullSecrets().stream()
.map(LocalObjectReference::getName)
.collect(Collectors.toList());
assertThat(resultSecrets).isEqualTo(IMAGE_PULL_SECRETS);
}
@Test
void testNodeSelector() {
assertThat(this.resultPod.getSpec().getNodeSelector()).isEqualTo(nodeSelector);
}
@Test
void testPodTolerations() {
assertThat(this.resultPod.getSpec().getTolerations())
.containsExactlyInAnyOrderElementsOf(TOLERATION);
}
@Test
void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() {
final List<EnvVar> envVars = this.resultMainContainer.getEnv();
assertThat(envVars)
.anyMatch(
envVar ->
envVar.getName().equals(Constants.ENV_FLINK_LOG_DIR)
&& envVar.getValue().equals(USER_DEFINED_FLINK_LOG_DIR));
}
@Test
void testDNSPolicyDefaultValue() {
assertThat(this.resultPod.getSpec().getDnsPolicy()).isEqualTo(Constants.DNS_POLICY_DEFAULT);
}
}