blob: d6965b0437555c85b7b16bbd9542916fda3285ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.util.SerializedThrowable;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import lombok.Getter;
import lombok.Setter;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/** Flink service mock for tests. */
public class TestingFlinkService extends AbstractFlinkService {
public static final Map<String, String> CLUSTER_INFO =
Map.of(
DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
"15.0.0",
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
"1234567 @ 1970-01-01T00:00:00+00:00");
private int savepointCounter = 0;
private int triggerCounter = 0;
private final List<Tuple3<String, JobStatusMessage, Configuration>> jobs = new ArrayList<>();
private final Map<JobID, String> jobErrors = new HashMap<>();
@Getter private final Set<String> sessions = new HashSet<>();
@Setter private boolean isFlinkJobNotFound = false;
@Setter private boolean isFlinkJobTerminatedWithoutCancellation = false;
@Setter private boolean isPortReady = true;
@Setter private boolean haDataAvailable = true;
@Setter private boolean jobManagerReady = true;
@Setter private boolean deployFailure = false;
@Setter private Runnable sessionJobSubmittedCallback;
@Setter private PodList podList = new PodList();
@Setter private Consumer<Configuration> listJobConsumer = conf -> {};
private final List<String> disposedSavepoints = new ArrayList<>();
private final Map<String, Boolean> savepointTriggers = new HashMap<>();
@Getter private int desiredReplicas = 0;
@Getter private int cancelJobCallCount = 0;
@Setter
private Tuple2<
Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
checkpointInfo;
private Map<String, String> metricsValues = new HashMap<>();
public TestingFlinkService() {
super(null, new FlinkConfigManager(new Configuration()));
}
public TestingFlinkService(KubernetesClient kubernetesClient) {
super(kubernetesClient, new FlinkConfigManager(new Configuration()));
}
public <T extends HasMetadata> Context<T> getContext() {
return new TestUtils.TestingContext<>() {
@Override
public Optional<T> getSecondaryResource(Class aClass, String s) {
if (jobs.isEmpty() && sessions.isEmpty()) {
return Optional.empty();
}
return (Optional<T>) Optional.of(TestUtils.createDeployment(jobManagerReady));
}
};
}
public void clear() {
jobs.clear();
sessions.clear();
triggerCounter = 0;
savepointCounter = 0;
}
public void clearJobsInTerminalState() {
jobs.removeIf(job -> job.f1.getJobState().isTerminalState());
}
@Override
public void submitApplicationCluster(
JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
if (requireHaMetadata) {
validateHaMetadataExists(conf);
}
deployApplicationCluster(jobSpec, removeOperatorConfigs(conf));
}
protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
if (deployFailure) {
throw new Exception("Deployment failure");
}
if (!jobs.isEmpty()) {
throw new Exception("Cannot submit 2 application clusters at the same time");
}
JobID jobID = new JobID();
if (conf.contains(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)) {
jobID = JobID.fromHexString(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
}
JobStatusMessage jobStatusMessage =
new JobStatusMessage(
jobID,
conf.getString(KubernetesConfigOptions.CLUSTER_ID),
JobStatus.RUNNING,
System.currentTimeMillis());
jobs.add(
Tuple3.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH), jobStatusMessage, conf));
}
protected void validateHaMetadataExists(Configuration conf) {
if (!isHaMetadataAvailable(conf)) {
throw new RecoveryFailureException(
"HA metadata not available to restore from last state. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
"RestoreFailed");
}
}
@Override
public boolean isHaMetadataAvailable(Configuration conf) {
return HighAvailabilityMode.isHighAvailabilityModeActivated(conf) && haDataAvailable;
}
@Override
public void submitSessionCluster(Configuration conf) throws Exception {
if (deployFailure) {
throw new Exception("Deployment failure");
}
sessions.add(conf.get(KubernetesConfigOptions.CLUSTER_ID));
}
@Override
public JobID submitJobToSessionCluster(
ObjectMeta meta,
FlinkSessionJobSpec spec,
Configuration conf,
@Nullable String savepoint)
throws Exception {
if (deployFailure) {
throw new Exception("Deployment failure");
}
JobID jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
JobStatusMessage jobStatusMessage =
new JobStatusMessage(
jobID,
conf.getString(KubernetesConfigOptions.CLUSTER_ID),
JobStatus.RUNNING,
System.currentTimeMillis());
jobs.add(Tuple3.of(savepoint, jobStatusMessage, conf));
if (sessionJobSubmittedCallback != null) {
sessionJobSubmittedCallback.run();
}
return jobID;
}
@Override
public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
if (!isPortReady) {
throw new TimeoutException("JM port is unavailable");
}
return super.listJobs(conf);
}
public List<Tuple3<String, JobStatusMessage, Configuration>> listJobs() {
return jobs;
}
public long getRunningCount() {
return jobs.stream().filter(t -> !t.f1.getJobState().isTerminalState()).count();
}
@Override
public void triggerSavepoint(
String jobId,
SavepointTriggerType triggerType,
SavepointInfo savepointInfo,
Configuration conf) {
var triggerId = "trigger_" + triggerCounter++;
var savepointFormatType = SavepointUtils.getSavepointFormatType(conf);
savepointInfo.setTrigger(
triggerId, triggerType, SavepointFormatType.valueOf(savepointFormatType.name()));
savepointTriggers.put(triggerId, false);
}
@Override
public SavepointFetchResult fetchSavepointInfo(
String triggerId, String jobId, Configuration conf) {
if (savepointTriggers.containsKey(triggerId)) {
if (savepointTriggers.get(triggerId)) {
return SavepointFetchResult.completed("savepoint_" + savepointCounter++);
}
savepointTriggers.put(triggerId, true);
return SavepointFetchResult.pending();
}
return SavepointFetchResult.error("Failed");
}
@Override
public ClusterClient<String> getClusterClient(Configuration config) throws Exception {
TestingClusterClient<String> clusterClient = new TestingClusterClient<>(config);
FlinkVersion flinkVersion = config.get(FlinkConfigBuilder.FLINK_VERSION);
clusterClient.setListJobsFunction(
() -> {
listJobConsumer.accept(config);
if (jobs.isEmpty()
&& !sessions.isEmpty()
&& config.get(DeploymentOptions.TARGET)
.equals(KubernetesDeploymentTarget.APPLICATION.getName())) {
throw new RuntimeException("Trying to list a job without submitting it");
}
return CompletableFuture.completedFuture(
jobs.stream().map(t -> t.f1).collect(Collectors.toList()));
});
clusterClient.setStopWithSavepointFunction(
(jobID, advanceEventTime, savepointDir) -> {
try {
return CompletableFuture.completedFuture(
cancelJob(flinkVersion, jobID, true));
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
});
clusterClient.setCancelFunction(
jobID -> {
try {
cancelJob(flinkVersion, jobID, false);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
});
clusterClient.setRequestResultFunction(
jobID -> {
var builder = new JobResult.Builder().jobId(jobID).netRuntime(1);
if (jobErrors.containsKey(jobID)) {
builder.serializedThrowable(
new SerializedThrowable(
new RuntimeException(jobErrors.get(jobID))));
}
return CompletableFuture.completedFuture(builder.build());
});
clusterClient.setRequestProcessor(
(messageHeaders, messageParameters, requestBody) -> {
if (messageHeaders instanceof JobsOverviewHeaders) {
return CompletableFuture.completedFuture(getMultipleJobsDetails());
}
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
});
return clusterClient;
}
private MultipleJobsDetails getMultipleJobsDetails() {
return new MultipleJobsDetails(
jobs.stream()
.map(tuple -> tuple.f1)
.map(TestingFlinkService::toJobDetails)
.collect(Collectors.toList()));
}
private static JobDetails toJobDetails(JobStatusMessage jobStatus) {
return new JobDetails(
jobStatus.getJobId(),
jobStatus.getJobName(),
jobStatus.getStartTime(),
-1,
System.currentTimeMillis() - jobStatus.getStartTime(),
jobStatus.getJobState(),
System.currentTimeMillis(),
new int[ExecutionState.values().length],
0);
}
@Override
public void cancelJob(
FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration)
throws Exception {
cancelJob(deployment, upgradeMode, configuration, false);
}
private String cancelJob(FlinkVersion flinkVersion, JobID jobID, boolean savepoint)
throws Exception {
cancelJobCallCount++;
if (!isPortReady) {
throw new TimeoutException("JM port is unavailable");
}
if (isFlinkJobNotFound) {
throw new FlinkJobNotFoundException(jobID);
}
var jobOpt = jobs.stream().filter(js -> js.f1.getJobId().equals(jobID)).findAny();
if (isFlinkJobTerminatedWithoutCancellation) {
JobStatusMessage oldStatus = jobOpt.get().f1;
jobOpt.get().f1 =
new JobStatusMessage(
oldStatus.getJobId(),
oldStatus.getJobName(),
JobStatus.FAILED,
oldStatus.getStartTime());
throw new FlinkJobTerminatedWithoutCancellationException(jobID, JobStatus.FAILED);
}
if (jobOpt.isEmpty()) {
throw new Exception("Job not found");
}
var sp = savepoint ? "savepoint_" + savepointCounter++ : null;
if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
JobStatusMessage oldStatus = jobOpt.get().f1;
jobOpt.get().f1 =
new JobStatusMessage(
oldStatus.getJobId(),
oldStatus.getJobName(),
JobStatus.FINISHED,
oldStatus.getStartTime());
jobOpt.get().f0 = sp;
} else {
jobs.removeIf(js -> js.f1.getJobId().equals(jobID));
}
return sp;
}
@Override
protected void deleteClusterInternal(
ObjectMeta meta,
Configuration conf,
boolean deleteHaMeta,
DeletionPropagation deletionPropagation) {
jobs.clear();
sessions.remove(meta.getName());
}
@Override
public void waitForClusterShutdown(Configuration conf) {}
@Override
public void disposeSavepoint(String savepointPath, Configuration conf) {
disposedSavepoints.add(savepointPath);
}
public List<String> getDisposedSavepoints() {
return disposedSavepoints;
}
@Override
public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
jobs.stream()
.filter(js -> js.f1.getJobId().equals(jobId))
.findAny()
.ifPresent(
t -> {
if (!t.f1.getJobState().isGloballyTerminalState()) {
throw new RuntimeException(
"Checkpoint should not be queried if job is not in terminal state");
}
});
return super.getLastCheckpoint(jobId, conf);
}
@Override
public Tuple2<
Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
if (checkpointInfo != null) {
return checkpointInfo;
}
var jobOpt = jobs.stream().filter(js -> js.f1.getJobId().equals(jobId)).findAny();
if (jobOpt.isEmpty()) {
throw new Exception("Job not found");
}
var t = jobOpt.get();
if (t.f0 != null) {
return Tuple2.of(
Optional.of(
new CheckpointHistoryWrapper.CompletedCheckpointInfo(
0L, t.f0, System.currentTimeMillis())),
Optional.empty());
} else {
return Tuple2.of(Optional.empty(), Optional.empty());
}
}
@Override
public boolean isJobManagerPortReady(Configuration config) {
return isPortReady;
}
@Override
public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
return podList;
}
@Override
protected PodList getJmPodList(String namespace, String clusterId) {
return podList;
}
public void markApplicationJobFailedWithError(JobID jobID, String error) throws Exception {
var job = jobs.stream().filter(tuple -> tuple.f1.getJobId().equals(jobID)).findFirst();
if (job.isEmpty()) {
throw new Exception("The target job missed");
}
var oldStatus = job.get().f1;
job.get().f1 =
new JobStatusMessage(
oldStatus.getJobId(),
oldStatus.getJobName(),
JobStatus.FAILED,
oldStatus.getStartTime());
jobErrors.put(jobID, error);
}
/** The information collector of a submitted job. */
public static class SubmittedJobInfo {
public final String savepointPath;
public final JobStatusMessage jobStatusMessage;
public final Configuration effectiveConfig;
public SubmittedJobInfo(
String savepointPath,
JobStatusMessage jobStatusMessage,
Configuration effectiveConfig) {
this.savepointPath = savepointPath;
this.jobStatusMessage = jobStatusMessage;
this.effectiveConfig = effectiveConfig;
}
}
@Override
public Map<String, String> getClusterInfo(Configuration conf) {
return CLUSTER_INFO;
}
@Override
public boolean scale(ObjectMeta meta, JobSpec jobSpec, Configuration conf) {
if (conf.get(JobManagerOptions.SCHEDULER_MODE) != SchedulerExecutionMode.REACTIVE
&& jobSpec != null) {
return false;
}
desiredReplicas =
conf.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS);
return true;
}
public void setMetricValue(String name, String value) {
metricsValues.put(name, value);
}
@Override
public Map<String, String> getMetrics(
Configuration conf, String jobId, List<String> metricNames) {
return metricsValues;
}
}