blob: b60551a92a7f0d0ba1050d0170b100efc33b5b8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import io.fabric8.kubernetes.client.Config;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadPoolExecutor;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* @link FlinkOperator unit tests. Since at the time of writing this the JOSDK does not support
* overriding the configuration multiple times (has a singleton @link
* ConfigurationServiceProvider) we write multiple tests as a single function, please provide
* ample comments.
*/
public class FlinkOperatorTest {
@BeforeAll
public static void setAutoTryKubeConfig() {
System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
}
@Test
public void testConfigurationPassedToJOSDK() {
var testParallelism = 42;
var testSelector = "flink=enabled";
var testLeaseName = "test-lease";
var operatorConfig = new Configuration();
// We need to set this property so the operator can configure the lease namespace
System.setProperty(Config.KUBERNETES_NAMESPACE_SYSTEM_PROPERTY, "test_namespace");
operatorConfig.setInteger(
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_PARALLELISM, testParallelism);
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LABEL_SELECTOR, testSelector);
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_STOP_ON_INFORMER_ERROR, false);
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_ENABLED, true);
operatorConfig.set(
KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_LEASE_NAME, testLeaseName);
var testOperator = new FlinkOperator(operatorConfig);
testOperator.registerDeploymentController();
testOperator.registerSessionJobController();
var configService = testOperator.getOperator().getConfigurationService();
// Test parallelism being passed
var executorService = configService.getExecutorService();
Assertions.assertInstanceOf(ThreadPoolExecutor.class, executorService);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
Assertions.assertEquals(threadPoolExecutor.getMaximumPoolSize(), testParallelism);
// Test label selector being passed
// We have a label selector for each controller
var labelSelectors =
testOperator.registeredControllers.stream()
.map(RegisteredController::getConfiguration)
.map(ControllerConfiguration::getLabelSelector);
labelSelectors.forEach(selector -> Assertions.assertEquals(testSelector, selector));
Assertions.assertFalse(configService.stopOnInformerErrorDuringStartup());
testOperator.registeredControllers.stream()
.map(RegisteredController::getConfiguration)
.map(ControllerConfiguration::getRateLimiter)
.forEach(rl -> Assertions.assertTrue(((LinearRateLimiter) rl).isActivated()));
var leaderElectionConfiguration = configService.getLeaderElectionConfiguration().get();
Assertions.assertEquals(testLeaseName, leaderElectionConfiguration.getLeaseName());
Assertions.assertFalse(leaderElectionConfiguration.getLeaseNamespace().isPresent());
Assertions.assertFalse(leaderElectionConfiguration.getIdentity().isPresent());
}
@Test
public void testLeaderElectionConfig() {
var operatorConfig = new Configuration();
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_ENABLED, true);
try {
new FlinkOperator(operatorConfig);
} catch (IllegalConfigurationException ice) {
assertTrue(
ice.getMessage()
.startsWith(
"kubernetes.operator.leader-election.lease-name must be defined"));
}
var flinkOperator = new FlinkOperator(new Configuration());
assertTrue(
flinkOperator
.getOperator()
.getConfigurationService()
.getLeaderElectionConfiguration()
.isEmpty());
}
}