blob: fcc3947a0dfc4872c944b905c810a47ec8ceda87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.kubernetes.KubernetesTestUtils;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesPodTestBase;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParametersTest;
import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeMount;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** General tests for the {@link KerberosMountDecoratorTest}. */
class KerberosMountDecoratorTest extends KubernetesPodTestBase {
private KerberosMountDecorator kerberosMountDecorator;
private AbstractKubernetesParametersTest.TestingKubernetesParameters
testingKubernetesParameters;
private static final String CUSTOM_KRB5_CONF_FILE = "mykrb5.conf";
@Override
protected void setupFlinkConfig() {
super.setupFlinkConfig();
flinkConfig.set(
SecurityOptions.KERBEROS_LOGIN_KEYTAB, kerberosDir.toString() + "/" + KEYTAB_FILE);
flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "test");
flinkConfig.set(
SecurityOptions.KERBEROS_KRB5_PATH,
kerberosDir.toString() + "/" + CUSTOM_KRB5_CONF_FILE);
}
@Override
protected void generateKerberosFileItems() throws IOException {
KubernetesTestUtils.createTemporyFile("some keytab", kerberosDir, KEYTAB_FILE);
KubernetesTestUtils.createTemporyFile("some conf", kerberosDir, CUSTOM_KRB5_CONF_FILE);
}
@Override
protected void onSetup() throws Exception {
super.onSetup();
generateKerberosFileItems();
this.testingKubernetesParameters =
new AbstractKubernetesParametersTest.TestingKubernetesParameters(flinkConfig);
this.kerberosMountDecorator = new KerberosMountDecorator(testingKubernetesParameters);
}
@Test
void testWhetherPodOrContainerIsDecorated() {
final FlinkPod resultFlinkPod = kerberosMountDecorator.decorateFlinkPod(baseFlinkPod);
assertThat(resultFlinkPod.getPodWithoutMainContainer())
.isNotEqualTo(baseFlinkPod.getPodWithoutMainContainer());
assertThat(resultFlinkPod.getMainContainer()).isNotEqualTo(baseFlinkPod.getMainContainer());
}
@Test
void testConfEditWhenBuildAccompanyingKubernetesResources() throws IOException {
kerberosMountDecorator.buildAccompanyingKubernetesResources();
assertThat(
this.testingKubernetesParameters
.getFlinkConfiguration()
.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB))
.isEqualTo(
String.format("%s/%s", Constants.KERBEROS_KEYTAB_MOUNT_POINT, KEYTAB_FILE));
}
@Test
void testDecoratedFlinkContainer() {
final Container resultMainContainer =
kerberosMountDecorator.decorateFlinkPod(baseFlinkPod).getMainContainer();
assertThat(resultMainContainer.getVolumeMounts()).hasSize(2);
final VolumeMount keytabVolumeMount =
resultMainContainer.getVolumeMounts().stream()
.filter(x -> x.getName().equals(Constants.KERBEROS_KEYTAB_VOLUME))
.collect(Collectors.toList())
.get(0);
final VolumeMount krb5ConfVolumeMount =
resultMainContainer.getVolumeMounts().stream()
.filter(x -> x.getName().equals(Constants.KERBEROS_KRB5CONF_VOLUME))
.collect(Collectors.toList())
.get(0);
assertThat(keytabVolumeMount).isNotNull();
assertThat(krb5ConfVolumeMount).isNotNull();
assertThat(keytabVolumeMount.getMountPath())
.isEqualTo(Constants.KERBEROS_KEYTAB_MOUNT_POINT);
assertThat(krb5ConfVolumeMount.getMountPath())
.isEqualTo(Constants.KERBEROS_KRB5CONF_MOUNT_DIR + "/krb5.conf");
}
@Test
void testDecoratedFlinkPodVolumes() {
final FlinkPod resultFlinkPod = kerberosMountDecorator.decorateFlinkPod(baseFlinkPod);
List<Volume> volumes = resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes();
assertThat(volumes).hasSize(2);
final Volume keytabVolume =
volumes.stream()
.filter(x -> x.getName().equals(Constants.KERBEROS_KEYTAB_VOLUME))
.collect(Collectors.toList())
.get(0);
final Volume krb5ConfVolume =
volumes.stream()
.filter(x -> x.getName().equals(Constants.KERBEROS_KRB5CONF_VOLUME))
.collect(Collectors.toList())
.get(0);
assertThat(keytabVolume.getSecret()).isNotNull();
assertThat(keytabVolume.getSecret().getSecretName())
.isEqualTo(
KerberosMountDecorator.getKerberosKeytabSecretName(
testingKubernetesParameters.getClusterId()));
assertThat(krb5ConfVolume.getConfigMap()).isNotNull();
assertThat(krb5ConfVolume.getConfigMap().getName())
.isEqualTo(
KerberosMountDecorator.getKerberosKrb5confConfigMapName(
testingKubernetesParameters.getClusterId()));
assertThat(krb5ConfVolume.getConfigMap().getItems()).hasSize(1);
assertThat(krb5ConfVolume.getConfigMap().getItems().get(0).getKey())
.isEqualTo(CUSTOM_KRB5_CONF_FILE);
assertThat(krb5ConfVolume.getConfigMap().getItems().get(0).getPath())
.isEqualTo(KRB5_CONF_FILE);
}
}