blob: 2b2e23a72c60d3bb6740c5efd2ee89f666952aa3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.api.model.PodStatusBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.mockwebserver.utils.ResponseProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import okhttp3.Headers;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.provider.Arguments;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.HttpURLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/** Testing utilities. */
public class TestUtils extends BaseTestUtils {
public static final int MAX_RECONCILE_TIMES = 3;
private static final String TEST_PLUGINS = "test-plugins";
private static final String PlUGINS_JAR = TEST_PLUGINS + "-test-jar.jar";
public static PodList createFailedPodList(String crashLoopMessage, String reason) {
ContainerStatus cs =
new ContainerStatusBuilder()
.withNewState()
.withNewWaiting()
.withReason(reason)
.withMessage(crashLoopMessage)
.endWaiting()
.endState()
.build();
Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
pod.setStatus(
new PodStatusBuilder()
.withContainerStatuses(Collections.singletonList(cs))
.build());
return new PodListBuilder().withItems(pod).build();
}
public static Deployment createDeployment(boolean ready) {
String nowTs = Instant.now().toString();
var status = new DeploymentStatus();
status.setAvailableReplicas(ready ? 1 : 0);
status.setReplicas(1);
var availableCondition = new DeploymentCondition();
availableCondition.setType("Available");
availableCondition.setStatus(ready ? "True" : "False");
availableCondition.setLastTransitionTime(nowTs);
status.setConditions(List.of(availableCondition));
DeploymentSpec spec = new DeploymentSpec();
spec.setReplicas(1);
var meta = new ObjectMeta();
meta.setCreationTimestamp(nowTs);
Deployment deployment = new Deployment();
deployment.setMetadata(meta);
deployment.setSpec(spec);
deployment.setStatus(status);
return deployment;
}
public static Map<String, String> generateTestOwnerReferenceMap(AbstractFlinkResource owner) {
return Map.of(
"apiVersion", owner.getApiVersion(),
"kind", owner.getKind(),
"name", owner.getMetadata().getName(),
"uid", owner.getMetadata().getUid(),
"blockOwnerDeletion", "true",
"controller", "false");
}
public static <T extends HasMetadata> Context<T> createContextWithDeployment(
@Nullable Deployment deployment, KubernetesClient client) {
return new TestingContext<>() {
@Override
public Optional<T> getSecondaryResource(Class expectedType, String eventSourceName) {
return (Optional<T>) Optional.ofNullable(deployment);
}
@Override
public KubernetesClient getClient() {
return client;
}
};
}
public static <T extends HasMetadata> Context<T> createEmptyContext() {
return createContextWithDeployment(null, null);
}
public static <T extends HasMetadata> Context<T> createEmptyContextWithClient(
KubernetesClient client) {
return createContextWithDeployment(null, client);
}
public static <T extends HasMetadata> Context<T> createContextWithReadyJobManagerDeployment(
KubernetesClient client) {
return createContextWithDeployment(createDeployment(true), client);
}
public static <T extends HasMetadata> Context<T> createContextWithInProgressDeployment(
KubernetesClient client) {
return createContextWithDeployment(createDeployment(false), client);
}
public static <T extends HasMetadata> Context<T> createContextWithReadyFlinkDeployment() {
return createContextWithReadyFlinkDeployment(new HashMap<>(), null);
}
public static <T extends HasMetadata> Context<T> createContextWithReadyFlinkDeployment(
KubernetesClient client) {
return createContextWithReadyFlinkDeployment(new HashMap<>(), client);
}
public static <T extends HasMetadata> Context<T> createContextWithReadyFlinkDeployment(
Map<String, String> flinkDepConfig, KubernetesClient client) {
return createContextWithReadyFlinkDeployment(flinkDepConfig, client, FlinkVersion.v1_18);
}
public static <T extends HasMetadata> Context<T> createContextWithReadyFlinkDeployment(
Map<String, String> flinkDepConfig, KubernetesClient client, FlinkVersion version) {
return new TestingContext<>() {
@Override
public Optional<T> getSecondaryResource(Class expectedType, String eventSourceName) {
var session = buildSessionCluster(version);
session.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
session.getSpec().getFlinkConfiguration().putAll(flinkDepConfig);
session.getStatus()
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(session.getSpec(), session);
return (Optional<T>) Optional.of(session);
}
@Override
public KubernetesClient getClient() {
return client;
}
};
}
public static <T extends HasMetadata> Context<T> createContextWithNotReadyFlinkDeployment() {
return new TestingContext<>() {
@Override
public Optional<T> getSecondaryResource(Class expectedType, String eventSourceName) {
var session = buildSessionCluster();
session.getStatus()
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
return (Optional<T>) Optional.of(session);
}
};
}
public static final String DEPLOYMENT_ERROR = "test deployment error message";
public static <T extends HasMetadata> Context<T> createContextWithFailedJobManagerDeployment(
KubernetesClient client) {
return new TestingContext<>() {
@Override
public Optional getSecondaryResource(Class expectedType, String eventSourceName) {
DeploymentStatus status = new DeploymentStatus();
status.setAvailableReplicas(0);
status.setReplicas(1);
List<DeploymentCondition> conditions =
Collections.singletonList(
new DeploymentCondition(
null,
null,
DEPLOYMENT_ERROR,
"FailedCreate",
"status",
"ReplicaFailure"));
status.setConditions(conditions);
DeploymentSpec spec = new DeploymentSpec();
spec.setReplicas(1);
Deployment deployment = new Deployment();
deployment.setSpec(spec);
deployment.setStatus(status);
return Optional.of(deployment);
}
@Override
public KubernetesClient getClient() {
return client;
}
};
}
public static String getTestPluginsRootDir(Path temporaryFolder) throws IOException {
File testValidatorFolder = new File(temporaryFolder.toFile(), TEST_PLUGINS);
assertTrue(testValidatorFolder.mkdirs());
File testValidatorJar = new File("target", PlUGINS_JAR);
assertTrue(testValidatorJar.exists());
Files.copy(
testValidatorJar.toPath(), Paths.get(testValidatorFolder.toString(), PlUGINS_JAR));
return temporaryFolder.toAbsolutePath().toString();
}
// This code is taken slightly modified from: http://stackoverflow.com/a/7201825/568695
// it changes the environment variables of this JVM. Use only for testing purposes!
@SuppressWarnings("unchecked")
public static void setEnv(Map<String, String> newEnv) {
try {
Map<String, String> env = System.getenv();
Class<?> clazz = env.getClass();
Field field = clazz.getDeclaredField("m");
field.setAccessible(true);
Map<String, String> map = (Map<String, String>) field.get(env);
map.clear();
map.putAll(newEnv);
// only for Windows
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
try {
Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
theCaseInsensitiveEnvironmentField.setAccessible(true);
Map<String, String> ciEnv =
(Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
ciEnv.clear();
ciEnv.putAll(newEnv);
} catch (NoSuchFieldException ignored) {
}
} catch (Exception e1) {
throw new RuntimeException(e1);
}
}
public static KubernetesOperatorMetricGroup createTestMetricGroup(Configuration conf) {
return createTestMetricGroup(createTestMetricRegistry(), conf);
}
public static KubernetesOperatorMetricGroup createTestMetricGroup(
MetricRegistry metricRegistry, Configuration conf) {
return KubernetesOperatorMetricGroup.create(
metricRegistry, conf, TEST_NAMESPACE, "testopname", "testhost");
}
public static TestingMetricRegistry createTestMetricRegistry() {
return TestingMetricRegistry.builder()
.setDelimiter(".".charAt(0))
.setRegisterConsumer((metric, name, group) -> {})
.build();
}
public static Stream<Arguments> flinkVersionsAndUpgradeModes() {
List<Arguments> args = new ArrayList<>();
for (FlinkVersion version : Set.of(FlinkVersion.v1_15, FlinkVersion.v1_18)) {
for (UpgradeMode upgradeMode : UpgradeMode.values()) {
args.add(arguments(version, upgradeMode));
}
}
return args.stream();
}
public static Stream<Arguments> flinkVersions() {
return Stream.of(arguments(FlinkVersion.v1_15), arguments(FlinkVersion.v1_18));
}
public static FlinkDeployment createCanaryDeployment() {
var cr = new FlinkDeployment();
cr.setSpec(cr.initSpec());
var meta = new ObjectMeta();
meta.setGeneration(0L);
meta.setLabels(Map.of(CanaryResourceManager.CANARY_LABEL, "true"));
meta.setName("canary");
meta.setNamespace("default");
cr.setMetadata(meta);
return cr;
}
public static FlinkSessionJob createCanaryJob() {
var cr = new FlinkSessionJob();
cr.setSpec(cr.initSpec());
var meta = new ObjectMeta();
meta.setGeneration(0L);
meta.setLabels(Map.of(CanaryResourceManager.CANARY_LABEL, "true"));
meta.setName("canary");
meta.setNamespace("default");
cr.setMetadata(meta);
return cr;
}
public static void reconcileSpec(FlinkDeployment deployment) {
deployment
.getStatus()
.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(), deployment);
}
/**
* Sets up an active cron trigger by ensuring that the latest successful snapshot happened
* earlier than the scheduled trigger.
*/
public static void setupCronTrigger(SnapshotType snapshotType, FlinkDeployment deployment) {
Calendar calendar = Calendar.getInstance();
calendar.set(2022, Calendar.JUNE, 5, 11, 0);
long lastCheckpointTimestamp = calendar.getTimeInMillis();
String cronOptionKey;
switch (snapshotType) {
case SAVEPOINT:
Savepoint lastSavepoint =
Savepoint.of("", lastCheckpointTimestamp, SnapshotTriggerType.PERIODIC);
deployment
.getStatus()
.getJobStatus()
.getSavepointInfo()
.updateLastSavepoint(lastSavepoint);
cronOptionKey = KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key();
break;
case CHECKPOINT:
Checkpoint lastCheckpoint =
Checkpoint.of(lastCheckpointTimestamp, SnapshotTriggerType.PERIODIC);
deployment
.getStatus()
.getJobStatus()
.getCheckpointInfo()
.updateLastCheckpoint(lastCheckpoint);
cronOptionKey = KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key();
break;
default:
throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
}
deployment.getSpec().getFlinkConfiguration().put(cronOptionKey, "0 0 12 5 6 ? 2022");
reconcileSpec(deployment);
}
/** Testing ResponseProvider. */
public static class ValidatingResponseProvider<T> implements ResponseProvider<Object> {
private final AtomicBoolean validated = new AtomicBoolean(false);
private final Consumer<RecordedRequest> validator;
private final T returnValue;
public ValidatingResponseProvider(T returnValue, Consumer<RecordedRequest> validator) {
this.validator = validator;
this.returnValue = returnValue;
}
public void assertValidated() {
Assertions.assertTrue(validated.get());
}
@Override
public int getStatusCode(RecordedRequest recordedRequest) {
return HttpURLConnection.HTTP_CREATED;
}
@Override
public Headers getHeaders() {
return new Headers.Builder().build();
}
@Override
public void setHeaders(Headers headers) {}
@Override
public Object getBody(RecordedRequest recordedRequest) {
validator.accept(recordedRequest);
validated.set(true);
return returnValue;
}
}
/**
* Base context implementation for tests.
*
* @param <T> Resource type
*/
public static class TestingContext<T extends HasMetadata> implements Context<T> {
@Override
public Optional<RetryInfo> getRetryInfo() {
return Optional.empty();
}
@Override
public <T1> Set<T1> getSecondaryResources(Class<T1> aClass) {
return null;
}
@Override
public <T1> Optional<T1> getSecondaryResource(Class<T1> aClass, String s) {
return Optional.empty();
}
@Override
public <R> Optional<R> getSecondaryResource(
Class<R> aClass, ResourceDiscriminator<R, T> resourceDiscriminator) {
return Optional.empty();
}
@Override
public ControllerConfiguration<T> getControllerConfiguration() {
return null;
}
@Override
public ManagedDependentResourceContext managedDependentResourceContext() {
return null;
}
@Override
public EventSourceRetriever<T> eventSourceRetriever() {
return null;
}
@Override
public KubernetesClient getClient() {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public ExecutorService getWorkflowExecutorService() {
throw new UnsupportedOperationException("Not implemented");
}
}
}