blob: fdf6991eebf2942958d9dcd12b71c65029454d67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
/**
* An abstract {@link FlinkService} containing some common implementations for the native and
* standalone Flink Services.
*/
public abstract class AbstractFlinkService implements FlinkService {
private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class);
private static final String EMPTY_JAR_FILENAME = "empty.jar";
protected final KubernetesClient kubernetesClient;
protected final FlinkConfigManager configManager;
private final ExecutorService executorService;
protected final ArtifactManager artifactManager;
private final String emptyJar;
public AbstractFlinkService(
KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
this.kubernetesClient = kubernetesClient;
this.configManager = configManager;
this.artifactManager = new ArtifactManager(configManager);
this.executorService =
Executors.newFixedThreadPool(
4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
this.emptyJar = createEmptyJar();
}
protected abstract PodList getJmPodList(String namespace, String clusterId);
protected abstract void deployApplicationCluster(JobSpec jobSpec, Configuration conf)
throws Exception;
@Override
public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
@Override
public void submitApplicationCluster(
JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
LOG.info(
"Deploying application cluster{}",
requireHaMetadata ? " requiring last-state from HA metadata" : "");
if (FlinkUtils.isKubernetesHAActivated(conf)) {
final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
// Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
// parallelism) could take effect
FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
}
if (requireHaMetadata) {
validateHaMetadataExists(conf);
}
deployApplicationCluster(jobSpec, removeOperatorConfigs(conf));
}
@Override
public boolean isHaMetadataAvailable(Configuration conf) {
return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
}
@Override
public JobID submitJobToSessionCluster(
ObjectMeta meta,
FlinkSessionJobSpec spec,
Configuration conf,
@Nullable String savepoint)
throws Exception {
// we generate jobID in advance to help deduplicate job submission.
var jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf, savepoint);
LOG.info("Submitted job: {} to session cluster.", jobID);
return jobID;
}
@Override
public boolean isJobManagerPortReady(Configuration config) {
final URI uri;
try (ClusterClient<String> clusterClient = getClusterClient(config)) {
uri = URI.create(clusterClient.getWebInterfaceURL());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
Socket socket = new Socket();
try {
socket.connect(socketAddress, 1000);
socket.close();
return true;
} catch (IOException e) {
return false;
}
}
@Override
public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
return clusterClient
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.thenApply(AbstractFlinkService::toJobStatusMessage)
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.toSeconds(),
TimeUnit.SECONDS);
}
}
@Override
public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
return clusterClient
.requestJobResult(jobID)
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.getSeconds(),
TimeUnit.SECONDS);
}
}
protected void cancelJob(
FlinkDeployment deployment,
UpgradeMode upgradeMode,
Configuration conf,
boolean deleteClusterAfterSavepoint)
throws Exception {
var deploymentStatus = deployment.getStatus();
var jobIdString = deploymentStatus.getJobStatus().getJobId();
var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
Optional<String> savepointOpt = Optional.empty();
var savepointFormatType = SavepointUtils.getSavepointFormatType(conf);
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
var clusterId = clusterClient.getClusterId();
switch (upgradeMode) {
case STATELESS:
if (ReconciliationUtils.isJobRunning(deployment.getStatus())) {
LOG.info("Job is running, cancelling job.");
try {
clusterClient
.cancel(Preconditions.checkNotNull(jobId))
.get(
configManager
.getOperatorConfiguration()
.getFlinkCancelJobTimeout()
.toSeconds(),
TimeUnit.SECONDS);
LOG.info("Job successfully cancelled.");
} catch (Exception e) {
LOG.error("Could not shut down cluster gracefully, deleting...", e);
}
}
deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
break;
case SAVEPOINT:
final String savepointDirectory =
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
try {
LOG.info("Suspending job with savepoint.");
String savepoint =
clusterClient
.stopWithSavepoint(
Preconditions.checkNotNull(jobId),
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? savepointFormatType
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info("Job successfully suspended with savepoint {}.", savepoint);
} catch (TimeoutException exception) {
throw new FlinkException(
String.format(
"Timed out stopping the job %s in Flink cluster %s with savepoint, "
+ "please configure a larger timeout via '%s'",
jobId,
clusterId,
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
.key()),
exception);
}
} else if (ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
LOG.info(
"Job is already in terminal state skipping cancel-with-savepoint operation.");
} else {
throw new RuntimeException(
"Unexpected non-terminal status: " + deploymentStatus);
}
if (deleteClusterAfterSavepoint) {
LOG.info("Cleaning up deployment after stop-with-savepoint");
deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
}
break;
case LAST_STATE:
deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false);
break;
default:
throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
}
}
deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
savepointOpt.ifPresent(
location -> {
Savepoint sp =
Savepoint.of(
location,
SavepointTriggerType.UPGRADE,
SavepointFormatType.valueOf(savepointFormatType.name()));
deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
});
var shutdownDisabled =
upgradeMode != UpgradeMode.LAST_STATE
&& FlinkUtils.clusterShutdownDisabled(
ReconciliationUtils.getDeployedSpec(deployment));
if (!shutdownDisabled) {
waitForClusterShutdown(conf);
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
}
}
@Override
public void cancelSessionJob(
FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
throws Exception {
var sessionJobStatus = sessionJob.getStatus();
var jobStatus = sessionJobStatus.getJobStatus();
var jobIdString = jobStatus.getJobId();
Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
var jobId = JobID.fromHexString(jobIdString);
Optional<String> savepointOpt = Optional.empty();
LOG.debug("Current job state: {}", jobStatus.getState());
if (!ReconciliationUtils.isJobInTerminalState(sessionJobStatus)) {
LOG.debug("Job is not in terminal state, cancelling it");
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
final String clusterId = clusterClient.getClusterId();
switch (upgradeMode) {
case STATELESS:
LOG.info("Cancelling job.");
clusterClient
.cancel(jobId)
.get(
configManager
.getOperatorConfiguration()
.getFlinkCancelJobTimeout()
.toSeconds(),
TimeUnit.SECONDS);
LOG.info("Job successfully cancelled.");
break;
case SAVEPOINT:
if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {
LOG.info("Suspending job with savepoint.");
final String savepointDirectory =
Preconditions.checkNotNull(
conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
final long timeout =
conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
.getSeconds();
try {
String savepoint =
clusterClient
.stopWithSavepoint(
jobId,
false,
savepointDirectory,
conf.get(FLINK_VERSION)
.isNewerVersionThan(
FlinkVersion.v1_14)
? conf.get(
KubernetesOperatorConfigOptions
.OPERATOR_SAVEPOINT_FORMAT_TYPE)
: null)
.get(timeout, TimeUnit.SECONDS);
savepointOpt = Optional.of(savepoint);
LOG.info(
"Job successfully suspended with savepoint {}.", savepoint);
} catch (TimeoutException exception) {
throw new FlinkException(
String.format(
"Timed out stopping the job %s in Flink cluster %s with savepoint, "
+ "please configure a larger timeout via '%s'",
jobId,
clusterId,
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT
.key()),
exception);
}
} else {
throw new RuntimeException(
"Unexpected non-terminal status: " + jobStatus.getState());
}
break;
case LAST_STATE:
default:
throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
}
}
} else {
LOG.debug("Job is in terminal state, skipping cancel");
}
jobStatus.setState(JobStatus.FINISHED.name());
savepointOpt.ifPresent(
location -> {
Savepoint sp = Savepoint.of(location, SavepointTriggerType.UPGRADE);
jobStatus.getSavepointInfo().updateLastSavepoint(sp);
});
}
@Override
public void triggerSavepoint(
String jobId,
SavepointTriggerType triggerType,
org.apache.flink.kubernetes.operator.api.status.SavepointInfo savepointInfo,
Configuration conf)
throws Exception {
LOG.info("Triggering new savepoint");
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
var savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
var savepointTriggerMessageParameters =
savepointTriggerHeaders.getUnresolvedMessageParameters();
savepointTriggerMessageParameters.jobID.resolve(JobID.fromHexString(jobId));
var savepointDirectory =
Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
var timeout =
configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds();
var savepointFormatType = SavepointUtils.getSavepointFormatType(conf);
var response =
clusterClient
.sendRequest(
savepointTriggerHeaders,
savepointTriggerMessageParameters,
new SavepointTriggerRequestBody(
savepointDirectory,
false,
conf.get(FLINK_VERSION)
.isNewerVersionThan(FlinkVersion.v1_14)
? savepointFormatType
: null,
null))
.get(timeout, TimeUnit.SECONDS);
LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString());
savepointInfo.setTrigger(
response.getTriggerId().toHexString(),
triggerType,
SavepointFormatType.valueOf(savepointFormatType.name()));
}
}
@Override
public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
var headers = CustomCheckpointingStatisticsHeaders.getInstance();
var params = headers.getUnresolvedMessageParameters();
params.jobPathParameter.resolve(jobId);
CompletableFuture<CheckpointHistoryWrapper> response =
clusterClient.sendRequest(headers, params, EmptyRequestBody.getInstance());
var checkpoints =
response.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.getSeconds(),
TimeUnit.SECONDS);
var latestCheckpointOpt = checkpoints.getLatestCheckpointPath();
if (latestCheckpointOpt.isPresent()
&& latestCheckpointOpt
.get()
.equals(
NonPersistentMetadataCheckpointStorageLocation
.EXTERNAL_POINTER)) {
throw new RecoveryFailureException(
"Latest checkpoint not externally addressable, manual recovery required.",
"CheckpointNotFound");
}
return latestCheckpointOpt.map(
pointer -> Savepoint.of(pointer, SavepointTriggerType.UNKNOWN));
}
}
@Override
public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
clusterClient
.sendRequest(
SavepointDisposalTriggerHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
new SavepointDisposalRequest(savepointPath))
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.getSeconds(),
TimeUnit.SECONDS);
}
}
@Override
public SavepointFetchResult fetchSavepointInfo(
String triggerId, String jobId, Configuration conf) {
LOG.info("Fetching savepoint result with triggerId: " + triggerId);
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
SavepointStatusMessageParameters savepointStatusMessageParameters =
savepointStatusHeaders.getUnresolvedMessageParameters();
savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId));
savepointStatusMessageParameters.triggerIdPathParameter.resolve(
TriggerId.fromHexString(triggerId));
CompletableFuture<AsynchronousOperationResult<SavepointInfo>> response =
clusterClient.sendRequest(
savepointStatusHeaders,
savepointStatusMessageParameters,
EmptyRequestBody.getInstance());
if (response.get() == null || response.get().resource() == null) {
return SavepointFetchResult.pending();
}
if (response.get().resource().getLocation() == null) {
if (response.get().resource().getFailureCause() != null) {
LOG.error(
"Failure occurred while fetching the savepoint result",
response.get().resource().getFailureCause());
return SavepointFetchResult.error(
response.get().resource().getFailureCause().toString());
} else {
return SavepointFetchResult.pending();
}
}
String location = response.get().resource().getLocation();
LOG.info("Savepoint result: {}", location);
return SavepointFetchResult.completed(location);
} catch (Exception e) {
LOG.error("Exception while fetching the savepoint result", e);
return SavepointFetchResult.error(e.getMessage());
}
}
@Override
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
Map<String, String> runtimeVersion = new HashMap<>();
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
CustomDashboardConfiguration dashboardConfiguration =
clusterClient
.sendRequest(
CustomDashboardConfigurationHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.toSeconds(),
TimeUnit.SECONDS);
runtimeVersion.put(
DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
dashboardConfiguration.getFlinkVersion());
runtimeVersion.put(
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
dashboardConfiguration.getFlinkRevision());
}
return runtimeVersion;
}
@Override
public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
final String clusterId;
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
clusterId = clusterClient.getClusterId();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return getJmPodList(namespace, clusterId);
}
@Override
public void waitForClusterShutdown(Configuration conf) {
waitForClusterShutdown(
conf.getString(KubernetesConfigOptions.NAMESPACE),
conf.getString(KubernetesConfigOptions.CLUSTER_ID),
configManager
.getOperatorConfiguration()
.getFlinkShutdownClusterTimeout()
.toSeconds());
}
@VisibleForTesting
protected ClusterClient<String> getClusterClient(Configuration conf) throws Exception {
final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
final int port = conf.getInteger(RestOptions.PORT);
final String host =
ObjectUtils.firstNonNull(
configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
ExternalServiceDecorator.getNamespacedExternalServiceName(
clusterId, namespace));
final String restServerAddress = String.format("http://%s:%s", host, port);
LOG.debug("Creating RestClusterClient({})", restServerAddress);
return new RestClusterClient<>(
conf, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
}
private JarRunResponseBody runJar(
JobSpec job,
JobID jobID,
JarUploadResponseBody response,
Configuration conf,
String savepoint) {
String jarId =
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
JarRunHeaders headers = JarRunHeaders.getInstance();
JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
parameters.jarIdPathParameter.resolve(jarId);
JarRunRequestBody runRequestBody =
new JarRunRequestBody(
job.getEntryClass(),
null,
job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
job.getParallelism() > 0 ? job.getParallelism() : null,
jobID,
job.getAllowNonRestoredState(),
savepoint,
conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
? RestoreMode.DEFAULT
: null);
LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
return clusterClient
.sendRequest(headers, parameters, runRequestBody)
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.toSeconds(),
TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Failed to submit job to session cluster.", e);
throw new FlinkRuntimeException(e);
} finally {
deleteJar(conf, jarId);
}
}
private JarUploadResponseBody uploadJar(
ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
String targetDir = artifactManager.generateJarDir(objectMeta, spec);
File jarFile = artifactManager.fetch(findJarURI(spec.getJob()), conf, targetDir);
Preconditions.checkArgument(
jarFile.exists(),
String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
JarUploadHeaders headers = JarUploadHeaders.getInstance();
String clusterId = spec.getDeploymentName();
String namespace = objectMeta.getNamespace();
int port = conf.getInteger(RestOptions.PORT);
String host =
ObjectUtils.firstNonNull(
configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
ExternalServiceDecorator.getNamespacedExternalServiceName(
clusterId, namespace));
try (RestClient restClient = new RestClient(conf, executorService)) {
// TODO add method in flink#RestClusterClient to support upload jar.
return restClient
.sendRequest(
host,
port,
headers,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance(),
Collections.singletonList(
new FileUpload(
jarFile.toPath(), RestConstants.CONTENT_TYPE_JAR)))
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.toSeconds(),
TimeUnit.SECONDS);
} finally {
LOG.debug("Deleting the jar file {}", jarFile);
FileUtils.deleteFileOrDirectory(jarFile);
}
}
private String findJarURI(JobSpec jobSpec) {
if (jobSpec.getJarURI() != null) {
return jobSpec.getJarURI();
} else {
return emptyJar;
}
}
private void deleteJar(Configuration conf, String jarId) {
LOG.debug("Deleting the jar: {}", jarId);
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
parameters.jarIdPathParameter.resolve(jarId);
clusterClient
.sendRequest(headers, parameters, EmptyRequestBody.getInstance())
.get(
configManager
.getOperatorConfiguration()
.getFlinkClientTimeout()
.toSeconds(),
TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Failed to delete the jar: {}.", jarId, e);
}
}
/** Wait until the FLink cluster has completely shut down. */
@VisibleForTesting
void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
boolean jobManagerRunning = true;
boolean serviceRunning = true;
for (int i = 0; i < shutdownTimeout; i++) {
if (jobManagerRunning) {
PodList jmPodList = getJmPodList(namespace, clusterId);
if (jmPodList == null || jmPodList.getItems().isEmpty()) {
jobManagerRunning = false;
}
}
if (serviceRunning) {
Service service =
kubernetesClient
.services()
.inNamespace(namespace)
.withName(
ExternalServiceDecorator.getExternalServiceName(clusterId))
.fromServer()
.get();
if (service == null) {
serviceRunning = false;
}
}
if (!jobManagerRunning && !serviceRunning) {
break;
}
// log a message waiting to shutdown Flink cluster every 5 seconds.
if ((i + 1) % 5 == 0) {
LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
LOG.info("Cluster shutdown completed.");
}
private static List<JobStatusMessage> toJobStatusMessage(
MultipleJobsDetails multipleJobsDetails) {
return multipleJobsDetails.getJobs().stream()
.map(
details ->
new JobStatusMessage(
details.getJobId(),
details.getJobName(),
getEffectiveStatus(details),
details.getStartTime()))
.collect(Collectors.toList());
}
@VisibleForTesting
protected static Configuration removeOperatorConfigs(Configuration config) {
Configuration newConfig = new Configuration();
config.toMap()
.forEach(
(k, v) -> {
if (!k.startsWith(K8S_OP_CONF_PREFIX)) {
newConfig.setString(k, v);
}
});
return newConfig;
}
@VisibleForTesting
protected static JobStatus getEffectiveStatus(JobDetails details) {
int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
JobStatus effectiveStatus = details.getStatus();
if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
effectiveStatus = JobStatus.CREATED;
LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
}
return effectiveStatus;
}
private void validateHaMetadataExists(Configuration conf) {
if (!isHaMetadataAvailable(conf)) {
throw new RecoveryFailureException(
"HA metadata not available to restore from last state. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
"RestoreFailed");
}
}
private String createEmptyJar() {
try {
String emptyJarPath =
Files.createTempDirectory("flink").toString() + "/" + EMPTY_JAR_FILENAME;
LOG.debug("Creating empty jar to {}", emptyJarPath);
JarOutputStream target =
new JarOutputStream(new FileOutputStream(emptyJarPath), new Manifest());
target.close();
return emptyJarPath;
} catch (Exception e) {
throw new RuntimeException("Failed to create empty jar", e);
}
}
public Map<String, String> getMetrics(
Configuration conf, String jobId, List<String> metricNames) throws Exception {
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
var jobMetricsMessageParameters =
JobMetricsHeaders.getInstance().getUnresolvedMessageParameters();
jobMetricsMessageParameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
jobMetricsMessageParameters.metricsFilterParameter.resolve(metricNames);
var responseBody =
clusterClient
.sendRequest(
JobMetricsHeaders.getInstance(),
jobMetricsMessageParameters,
EmptyRequestBody.getInstance())
.get();
return responseBody.getMetrics().stream()
.map(metric -> Tuple2.of(metric.getId(), metric.getValue()))
.collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1));
}
}
@Override
public final void deleteClusterDeployment(
ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
deleteClusterInternal(meta, deleteHaData);
updateStatusAfterClusterDeletion(status);
}
/**
* Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
* allows deleting the native kubernetes HA resources as well.
*
* @param meta ObjectMeta of the deployment
* @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
*/
protected abstract void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps);
protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
var currentJobState = status.getJobStatus().getState();
if (currentJobState == null
|| !JobStatus.valueOf(currentJobState).isGloballyTerminalState()) {
status.getJobStatus().setState(JobStatus.FINISHED.name());
}
}
}