blob: fd1e30754bec74680b7951ed6db7d3c6695cc2e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.JobListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
* Encapsulates the functionality necessary to submit a program to a remote cluster.
*
* @param <T> type of the cluster id
*/
public abstract class ClusterClient<T> {
protected final Logger log = LoggerFactory.getLogger(getClass());
/** The optimizer used in the optimization of batch programs. */
final Optimizer compiler;
/** The actor system used to communicate with the JobManager. Lazily initialized upon first use */
protected final LazyActorSystemLoader actorSystemLoader;
/** Configuration of the client. */
protected final Configuration flinkConfig;
/** Timeout for futures. */
protected final FiniteDuration timeout;
/** Lookup timeout for the job manager retrieval service. */
private final FiniteDuration lookupTimeout;
/** Service factory for high available. */
protected final HighAvailabilityServices highAvailabilityServices;
private final boolean sharedHaServices;
/** Flag indicating whether to sysout print execution updates. */
private boolean printStatusDuringExecution = true;
/**
* For interactive invocations, the job results are only available after the ContextEnvironment has
* been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
* which lets us access the execution result here.
*/
protected JobExecutionResult lastJobExecutionResult;
/** Switch for blocking/detached job submission of the client. */
private boolean detachedJobSubmission = false;
protected List<JobListener> jobListeners;
/**
* Value returned by {@link #getMaxSlots()} if the number of maximum slots is unknown.
*/
public static final int MAX_SLOTS_UNKNOWN = -1;
// ------------------------------------------------------------------------
// Construction
// ------------------------------------------------------------------------
/**
* Creates a instance that submits the programs to the JobManager defined in the
* configuration. This method will try to resolve the JobManager hostname and throw an exception
* if that is not possible.
*
* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
*
* @throws Exception we cannot create the high availability services
*/
public ClusterClient(Configuration flinkConfig) throws Exception {
this(
flinkConfig,
HighAvailabilityServicesUtils.createHighAvailabilityServices(
flinkConfig,
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
false);
}
/**
* Creates a instance that submits the programs to the JobManager defined in the
* configuration. This method will try to resolve the JobManager hostname and throw an exception
* if that is not possible.
*
* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
* @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
* @param sharedHaServices true if the HighAvailabilityServices are shared and must not be shut down
*/
public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices, boolean sharedHaServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig);
this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
this.actorSystemLoader = new LazyActorSystemLoader(
highAvailabilityServices,
Time.milliseconds(lookupTimeout.toMillis()),
flinkConfig,
log);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.sharedHaServices = sharedHaServices;
}
// ------------------------------------------------------------------------
// Startup & Shutdown
// ------------------------------------------------------------------------
/**
* Utility class to lazily instantiate an {@link ActorSystem}.
*/
protected static class LazyActorSystemLoader {
private final Logger log;
private final HighAvailabilityServices highAvailabilityServices;
private final Time timeout;
private final Configuration configuration;
private ActorSystem actorSystem;
private LazyActorSystemLoader(
HighAvailabilityServices highAvailabilityServices,
Time timeout,
Configuration configuration,
Logger log) {
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.timeout = Preconditions.checkNotNull(timeout);
this.configuration = Preconditions.checkNotNull(configuration);
this.log = Preconditions.checkNotNull(log);
}
/**
* Indicates whether the ActorSystem has already been instantiated.
* @return boolean True if it exists, False otherwise
*/
public boolean isLoaded() {
return actorSystem != null;
}
public void shutdown() {
if (isLoaded()) {
actorSystem.shutdown();
actorSystem.awaitTermination();
actorSystem = null;
}
}
/**
* Creates a new ActorSystem or returns an existing one.
* @return ActorSystem
* @throws Exception if the ActorSystem could not be created
*/
public ActorSystem get() throws FlinkException {
if (!isLoaded()) {
// start actor system
log.info("Starting client actor system.");
final InetAddress ownHostname;
try {
ownHostname = LeaderRetrievalUtils.findConnectingAddress(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout);
} catch (LeaderRetrievalException lre) {
throw new FlinkException("Could not find out our own hostname by connecting to the " +
"leading JobManager. Please make sure that the Flink cluster has been started.", lre);
}
try {
actorSystem = BootstrapTools.startActorSystem(
configuration,
ownHostname.getCanonicalHostName(),
0,
log);
} catch (Exception e) {
throw new FlinkException("Could not start the ActorSystem lazily.", e);
}
}
return actorSystem;
}
}
/**
* Shuts down the client. This stops the internal actor system and actors.
*/
public void shutdown() throws Exception {
synchronized (this) {
actorSystemLoader.shutdown();
if (!sharedHaServices && highAvailabilityServices != null) {
highAvailabilityServices.close();
}
}
}
public void killCluster() throws Exception {
}
// ------------------------------------------------------------------------
// Configuration
// ------------------------------------------------------------------------
/**
* Configures whether the client should print progress updates during the execution to {@code System.out}.
* All updates are logged via the SLF4J loggers regardless of this setting.
*
* @param print True to print updates to standard out during execution, false to not print them.
*/
public void setPrintStatusDuringExecution(boolean print) {
this.printStatusDuringExecution = print;
}
public void setJobListeners(List<JobListener> jobListeners) {
this.jobListeners = jobListeners;
}
/**
* @return whether the client will print progress updates during the execution to {@code System.out}
*/
public boolean getPrintStatusDuringExecution() {
return this.printStatusDuringExecution;
}
/**
* Gets the current cluster connection info (may change in case of a HA setup).
*
* @return The the connection info to the leader component of the cluster
* @throws LeaderRetrievalException if the leader could not be retrieved
*/
public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException {
return LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout);
}
// ------------------------------------------------------------------------
// Access to the Program's Plan
// ------------------------------------------------------------------------
public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
throws CompilerException, ProgramInvocationException {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
}
public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
throws CompilerException, ProgramInvocationException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
} else if (prog.isUsingInteractiveMode()) {
// temporary hack to support the optimizer plan preview
OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
if (parallelism > 0) {
env.setParallelism(parallelism);
}
return env.getOptimizedPlan(prog);
} else {
throw new RuntimeException("Couldn't determine program mode.");
}
}
public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
Logger log = LoggerFactory.getLogger(ClusterClient.class);
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
return compiler.compile(p);
}
// ------------------------------------------------------------------------
// Program submission / execution
// ------------------------------------------------------------------------
/**
* General purpose method to run a user jar from the CliFrontend in either blocking or detached mode, depending
* on whether {@code setDetached(true)} or {@code setDetached(false)}.
* @param prog the packaged program
* @param parallelism the parallelism to execute the contained Flink job
* @return The result of the execution
* @throws ProgramMissingJobException
* @throws ProgramInvocationException
*/
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException, ProgramMissingJobException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
final JobWithJars jobWithJars;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
jobWithJars = prog.getPlanWithoutJars();
} else {
jobWithJars = prog.getPlanWithJars();
}
return run(jobWithJars, parallelism, prog.getSavepointSettings(), false);
}
else if (prog.isUsingInteractiveMode()) {
log.info("Starting program in interactive mode (detached: {})", isDetached());
final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
libraries = Collections.emptyList();
} else {
libraries = prog.getAllLibraries();
}
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
prog.getClasspaths(), prog.getLibjars(), prog.getFiles(),
prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointSettings());
ContextEnvironment.setAsContext(factory);
try {
// invoke main method
prog.invokeInteractiveModeForExecution();
if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
throw new ProgramMissingJobException("The program didn't contain a Flink job.");
}
if (isDetached()) {
// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
}
else {
// in blocking mode, we execute all Flink jobs contained in the user code and then return here
return this.lastJobExecutionResult;
}
}
finally {
ContextEnvironment.unsetContext();
}
}
else {
throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
}
}
public JobSubmissionResult run(JobWithJars program, int parallelism, boolean detached) throws ProgramInvocationException {
return run(program, parallelism, SavepointRestoreSettings.none(), detached);
}
/**
* Runs a program on the Flink cluster to which this client is connected. The call blocks until the
* execution is complete, and returns afterwards.
*
* @param jobWithJars The program to be executed.
* @param parallelism The default parallelism to use when running the program. The default parallelism is used
* when the program does not set a parallelism by itself.
*
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed.
*/
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings, boolean detached)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}
OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(),
jobWithJars.getLibjars(), jobWithJars.getFiles(), classLoader, savepointSettings, detached);
}
public JobSubmissionResult run(
FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings, boolean detached) throws ProgramInvocationException {
return run(compiledPlan, libraries, classpaths,
Collections.emptyList(), Collections.emptyList(), classLoader, savepointSettings, detached);
}
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, List<URI> libjars, List<URI> files,
ClassLoader classLoader, SavepointRestoreSettings savepointSettings, boolean detached)
throws ProgramInvocationException {
JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, libjars, files, savepointSettings);
return submitJob(job, classLoader, detached);
}
/**
* Submits a JobGraph blocking.
* @param jobGraph The JobGraph
* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
* @return JobExecutionResult
* @throws ProgramInvocationException
*/
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
waitForClusterToBeReady();
final ActorSystem actorSystem;
try {
actorSystem = actorSystemLoader.get();
} catch (FlinkException fe) {
throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " +
"JobManager.", fe);
}
try {
logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
this.lastJobExecutionResult = JobClient.submitJobAndWait(
actorSystem,
flinkConfig,
highAvailabilityServices,
jobGraph,
timeout,
printStatusDuringExecution,
classLoader);
return lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
}
/**
* Submits a JobGraph detached.
* @param jobGraph The JobGraph
* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
* @return JobSubmissionResult
* @throws ProgramInvocationException
*/
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
waitForClusterToBeReady();
final ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
} catch (Exception e) {
throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
}
try {
logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
JobClient.submitJobDetached(
new AkkaJobManagerGateway(jobManagerGateway),
flinkConfig,
jobGraph,
Time.milliseconds(timeout.toMillis()),
classLoader);
return new JobSubmissionResult(jobGraph.getJobID());
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
}
/**
* Reattaches to a running from the supplied job id.
* @param jobID The job id of the job to attach to
* @return The JobExecutionResult for the jobID
* @throws JobExecutionException if an error occurs during monitoring the job execution
*/
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
final ActorSystem actorSystem;
try {
actorSystem = actorSystemLoader.get();
} catch (FlinkException fe) {
throw new JobExecutionException(
jobID,
"Could not start the ActorSystem needed to talk to the JobManager.",
fe);
}
final JobListeningContext listeningContext = JobClient.attachToRunningJob(
jobID,
flinkConfig,
actorSystem,
highAvailabilityServices,
timeout,
printStatusDuringExecution);
return JobClient.awaitJobResult(listeningContext);
}
/**
* Reattaches to a running job with the given job id.
*
* @param jobID The job id of the job to attach to
* @return The JobExecutionResult for the jobID
* @throws JobExecutionException if an error occurs during monitoring the job execution
*/
public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
final ActorSystem actorSystem;
try {
actorSystem = actorSystemLoader.get();
} catch (FlinkException fe) {
throw new JobExecutionException(
jobID,
"Could not start the ActorSystem needed to talk to the JobManager.",
fe);
}
return JobClient.attachToRunningJob(
jobID,
flinkConfig,
actorSystem,
highAvailabilityServices,
timeout,
printStatusDuringExecution);
}
/**
* Requests the {@link JobStatus} of the job with the given {@link JobID}.
*/
public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
final ActorGateway jobManager;
try {
jobManager = getJobManagerGateway();
} catch (FlinkException e) {
throw new RuntimeException("Could not retrieve JobManage gateway.", e);
}
Future<Object> response = jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
CompletableFuture<Object> javaFuture = FutureUtils.<Object>toJava(response);
return javaFuture.thenApply((responseMessage) -> {
if (responseMessage instanceof JobManagerMessages.CurrentJobStatus) {
return ((JobManagerMessages.CurrentJobStatus) responseMessage).status();
} else if (responseMessage instanceof JobManagerMessages.JobNotFound) {
throw new CompletionException(
new IllegalStateException("Could not find job with JobId " + jobId));
} else {
throw new CompletionException(
new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
}
});
}
/**
* Cancels a job identified by the job id.
* @param jobId the job id
* @throws Exception In case an error occurred.
*/
public void cancel(JobID jobId) throws Exception {
final ActorGateway jobManager = getJobManagerGateway();
Object cancelMsg = new JobManagerMessages.CancelJob(jobId);
Future<Object> response = jobManager.ask(cancelMsg, timeout);
final Object rc = Await.result(response, timeout);
if (rc instanceof JobManagerMessages.CancellationSuccess) {
// no further action required
} else if (rc instanceof JobManagerMessages.CancellationFailure) {
throw new Exception("Canceling the job with ID " + jobId + " failed.",
((JobManagerMessages.CancellationFailure) rc).cause());
} else {
throw new IllegalStateException("Unexpected response: " + rc);
}
}
/**
* Cancels a job identified by the job id and triggers a savepoint.
* @param jobId the job id
* @param savepointDirectory directory the savepoint should be written to
* @return path where the savepoint is located
* @throws Exception In case an error cocurred.
*/
public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
final ActorGateway jobManager = getJobManagerGateway();
Object cancelMsg = new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointDirectory);
Future<Object> response = jobManager.ask(cancelMsg, timeout);
final Object rc = Await.result(response, timeout);
if (rc instanceof JobManagerMessages.CancellationSuccess) {
JobManagerMessages.CancellationSuccess success = (JobManagerMessages.CancellationSuccess) rc;
return success.savepointPath();
} else if (rc instanceof JobManagerMessages.CancellationFailure) {
throw new Exception("Cancel & savepoint for the job with ID " + jobId + " failed.",
((JobManagerMessages.CancellationFailure) rc).cause());
} else {
throw new IllegalStateException("Unexpected response: " + rc);
}
}
/**
* Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
* Stopping works only for streaming programs. Be aware, that the program might continue to run for
* a while after sending the stop command, because after sources stopped to emit data all operators
* need to finish processing.
*
* @param jobId
* the job ID of the streaming program to stop
* @throws Exception
* If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal
* failed. That might be due to an I/O problem, ie, the job-manager is unreachable.
*/
public void stop(final JobID jobId) throws Exception {
final ActorGateway jobManager = getJobManagerGateway();
Future<Object> response = jobManager.ask(new JobManagerMessages.StopJob(jobId), timeout);
final Object rc = Await.result(response, timeout);
if (rc instanceof JobManagerMessages.StoppingSuccess) {
// no further action required
} else if (rc instanceof JobManagerMessages.StoppingFailure) {
throw new Exception("Stopping the job with ID " + jobId + " failed.",
((JobManagerMessages.StoppingFailure) rc).cause());
} else {
throw new IllegalStateException("Unexpected response: " + rc);
}
}
/**
* Triggers a savepoint for the job identified by the job id. The savepoint will be written to the given savepoint
* directory, or {@link org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
*
* @param jobId job id
* @param savepointDirectory directory the savepoint should be written to
* @return path future where the savepoint is located
* @throws FlinkException if no connection to the cluster could be established
*/
public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) throws FlinkException {
final ActorGateway jobManager = getJobManagerGateway();
Future<Object> response = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.<String>apply(savepointDirectory)),
new FiniteDuration(1, TimeUnit.HOURS));
CompletableFuture<Object> responseFuture = FutureUtils.<Object>toJava(response);
return responseFuture.thenApply((responseMessage) -> {
if (responseMessage instanceof JobManagerMessages.TriggerSavepointSuccess) {
JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess) responseMessage;
return success.savepointPath();
} else if (responseMessage instanceof JobManagerMessages.TriggerSavepointFailure) {
JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure) responseMessage;
throw new CompletionException(failure.cause());
} else {
throw new CompletionException(
new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
}
});
}
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException {
final ActorGateway jobManager = getJobManagerGateway();
Object msg = new JobManagerMessages.DisposeSavepoint(savepointPath);
CompletableFuture<Object> responseFuture = FutureUtils.<Object>toJava(
jobManager.ask(
msg,
timeout));
return responseFuture.thenApply(
(Object response) -> {
if (response instanceof JobManagerMessages.DisposeSavepointSuccess$) {
return Acknowledge.get();
} else if (response instanceof JobManagerMessages.DisposeSavepointFailure) {
JobManagerMessages.DisposeSavepointFailure failureResponse = (JobManagerMessages.DisposeSavepointFailure) response;
if (failureResponse.cause() instanceof ClassNotFoundException) {
throw new CompletionException(
new ClassNotFoundException("Savepoint disposal failed, because of a " +
"missing class. This is most likely caused by a custom state " +
"instance, which cannot be disposed without the user code class " +
"loader. Please provide the program jar with which you have created " +
"the savepoint via -j <JAR> for disposal.",
failureResponse.cause().getCause()));
} else {
throw new CompletionException(failureResponse.cause());
}
} else {
throw new CompletionException(new FlinkRuntimeException("Unknown response type " + response.getClass().getSimpleName() + '.'));
}
});
}
/**
* Lists the currently running and finished jobs on the cluster.
*
* @return future collection of running and finished jobs
* @throws Exception if no connection to the cluster could be established
*/
public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
final ActorGateway jobManager = getJobManagerGateway();
Future<Object> response = jobManager.ask(new RequestJobDetails(true, false), timeout);
CompletableFuture<Object> responseFuture = FutureUtils.<Object>toJava(response);
return responseFuture.thenApply((responseMessage) -> {
if (responseMessage instanceof MultipleJobsDetails) {
MultipleJobsDetails details = (MultipleJobsDetails) responseMessage;
final Collection<JobDetails> jobDetails = details.getJobs();
Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size());
jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
return flattenedDetails;
} else {
throw new CompletionException(
new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
}
});
}
/**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished. The default class loader is used
* to deserialize the incoming accumulator results.
* @param jobID The job identifier of a job.
* @return A Map containing the accumulator's name and its value.
*/
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID) throws Exception {
return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
}
/**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished.
* @param jobID The job identifier of a job.
* @param loader The class loader for deserializing the accumulator results.
* @return A Map containing the accumulator's name and its value.
*/
public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
ActorGateway jobManagerGateway = getJobManagerGateway();
Future<Object> response;
try {
response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
} catch (Exception e) {
throw new Exception("Failed to query the job manager gateway for accumulators.", e);
}
Object result = Await.result(response, timeout);
if (result instanceof AccumulatorResultsFound) {
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators =
((AccumulatorResultsFound) result).result();
return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
} else if (result instanceof AccumulatorResultsErroneous) {
throw ((AccumulatorResultsErroneous) result).cause();
} else {
throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
}
}
// ------------------------------------------------------------------------
// Sessions
// ------------------------------------------------------------------------
/**
* Tells the JobManager to finish the session (job) defined by the given ID.
*
* @param jobId The ID that identifies the session.
*/
public void endSession(JobID jobId) throws Exception {
if (jobId == null) {
throw new IllegalArgumentException("The JobID must not be null.");
}
endSessions(Collections.singletonList(jobId));
}
/**
* Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
*
* @param jobIds The IDs that identify the sessions.
*/
public void endSessions(List<JobID> jobIds) throws Exception {
if (jobIds == null) {
throw new IllegalArgumentException("The JobIDs must not be null");
}
ActorGateway jobManagerGateway = getJobManagerGateway();
for (JobID jid : jobIds) {
if (jid != null) {
log.info("Telling job manager to end the session {}.", jid);
jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
}
}
}
// ------------------------------------------------------------------------
// Internal translation methods
// ------------------------------------------------------------------------
/**
* Creates the optimized plan for a given program, using this client's compiler.
*
* @param prog The program to be compiled.
* @return The compiled and optimized plan, as returned by the compiler.
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
*/
private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
throws CompilerException, ProgramInvocationException {
return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}
public static JobGraph getJobGraph(Configuration flinkConfig, PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
return getJobGraph(flinkConfig, optPlan, prog.getAllLibraries(), prog.getClasspaths(), prog.getLibjars(), prog.getFiles(), savepointSettings);
}
public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
return getJobGraph(flinkConfig, optPlan, jarFiles, classpaths, Collections.emptyList(), Collections.emptyList(), savepointSettings);
}
public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, List<URI> libjars, List<URI> files, SavepointRestoreSettings savepointSettings) {
JobGraph job;
if (optPlan instanceof StreamingPlan) {
job = ((StreamingPlan) optPlan).getJobGraph();
job.setSavepointRestoreSettings(savepointSettings);
} else {
JobGraphGenerator gen = new JobGraphGenerator(flinkConfig);
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
for (URL jar : jarFiles) {
try {
job.addJar(new Path(jar.toURI()));
} catch (URISyntaxException e) {
throw new RuntimeException("URL is invalid. This should not happen.", e);
}
}
for (URI libjar : libjars) {
job.addJar(new Path(libjar));
}
for (URI file : files) {
final String fileKey = file.getFragment() != null ? file.getFragment() : new Path(file).getName();
// Remove the part after '#' in file path since this part has been already set to file key.
job.addUserArtifact(fileKey, new DistributedCacheEntry(
org.apache.commons.lang3.StringUtils.substringBeforeLast(file.toString(), "#"), false, false));
}
job.setClasspaths(classpaths);
return job;
}
// ------------------------------------------------------------------------
// Helper methods
// ------------------------------------------------------------------------
/**
* Returns the {@link ActorGateway} of the current job manager leader using
* the {@link LeaderRetrievalService}.
*
* @return ActorGateway of the current job manager leader
* @throws Exception
*/
public ActorGateway getJobManagerGateway() throws FlinkException {
log.debug("Looking up JobManager");
try {
return LeaderRetrievalUtils.retrieveLeaderGateway(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
actorSystemLoader.get(),
lookupTimeout);
} catch (LeaderRetrievalException lre) {
throw new FlinkException("Could not connect to the leading JobManager. Please check that the " +
"JobManager is running.", lre);
}
}
/**
* Logs and prints to sysout if printing to stdout is enabled.
* @param message The message to log/print
*/
protected void logAndSysout(String message) {
log.info(message);
if (printStatusDuringExecution) {
System.out.println(message);
}
}
// ------------------------------------------------------------------------
// Abstract methods to be implemented by the cluster specific Client
// ------------------------------------------------------------------------
/**
* Blocks until the client has determined that the cluster is ready for Job submission.
*
* <p>This is delayed until right before job submission to report any other errors first
* (e.g. invalid job definitions/errors in the user jar)
*/
public abstract void waitForClusterToBeReady();
/**
* Returns an URL (as a string) to the JobManager web interface.
*/
public abstract String getWebInterfaceURL();
/**
* Returns the latest cluster status, with number of Taskmanagers and slots.
*/
public abstract GetClusterStatusResponse getClusterStatus();
/**
* May return new messages from the cluster.
* Messages can be for example about failed containers or container launch requests.
*/
public abstract List<String> getNewMessages();
/**
* Returns the cluster id identifying the cluster to which the client is connected.
*
* @return cluster id of the connected cluster
*/
public abstract T getClusterId();
/**
* Set the mode of this client (detached or blocking job execution).
* @param isDetached If true, the client will submit programs detached via the {@code run} method
*/
public void setDetached(boolean isDetached) {
this.detachedJobSubmission = isDetached;
}
/**
* A flag to indicate whether this clients submits jobs detached.
* @return True if the Client submits detached, false otherwise
*/
public boolean isDetached() {
return detachedJobSubmission;
}
/**
* Return the Flink configuration object.
* @return The Flink configuration object
*/
public Configuration getFlinkConfiguration() {
return flinkConfig.clone();
}
/**
* The client may define an upper limit on the number of slots to use.
* @return <tt>-1</tt> ({@link #MAX_SLOTS_UNKNOWN}) if unknown
*/
public abstract int getMaxSlots();
/**
* Returns true if the client already has the user jar and providing it again would
* result in duplicate uploading of the jar.
*/
public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
/**
* Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform
* some custom job submission logic.
* @param jobGraph The JobGraph to be submitted
* @return JobSubmissionResult
*/
public abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader, boolean detached)
throws ProgramInvocationException;
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
throws ProgramInvocationException {
return submitJob(jobGraph, classLoader, false);
}
/**
* Rescales the specified job such that it will have the new parallelism.
*
* @param jobId specifying the job to modify
* @param newParallelism specifying the new parallelism of the rescaled job
* @return Future which is completed once the rescaling has been completed
*/
public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism) {
throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support rescaling.");
}
public void shutDownCluster() {
throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support shutDownCluster.");
}
}