| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.client.program.rest; |
| |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.common.JobSubmissionResult; |
| import org.apache.flink.api.common.accumulators.AccumulatorHelper; |
| import org.apache.flink.api.common.cache.DistributedCache; |
| import org.apache.flink.api.common.time.Time; |
| import org.apache.flink.api.java.JobListener; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.client.program.ClusterClient; |
| import org.apache.flink.client.program.NewClusterClient; |
| import org.apache.flink.client.program.ProgramInvocationException; |
| import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy; |
| import org.apache.flink.client.program.rest.retry.WaitStrategy; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.CoreOptions; |
| import org.apache.flink.configuration.RestOptions; |
| import org.apache.flink.core.fs.Path; |
| import org.apache.flink.runtime.client.JobStatusMessage; |
| import org.apache.flink.runtime.client.JobSubmissionException; |
| import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; |
| import org.apache.flink.runtime.concurrent.FutureUtils; |
| import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; |
| import org.apache.flink.runtime.jobgraph.JobGraph; |
| import org.apache.flink.runtime.jobgraph.JobStatus; |
| import org.apache.flink.runtime.jobmaster.JobResult; |
| import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; |
| import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; |
| import org.apache.flink.runtime.messages.Acknowledge; |
| import org.apache.flink.runtime.rest.FileUpload; |
| import org.apache.flink.runtime.rest.RestClient; |
| import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; |
| import org.apache.flink.runtime.rest.handler.async.TriggerResponse; |
| import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders; |
| import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters; |
| import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; |
| import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters; |
| import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; |
| import org.apache.flink.runtime.rest.messages.EmptyRequestBody; |
| import org.apache.flink.runtime.rest.messages.EmptyResponseBody; |
| import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; |
| import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; |
| import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; |
| import org.apache.flink.runtime.rest.messages.JobMessageParameters; |
| import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; |
| import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; |
| import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; |
| import org.apache.flink.runtime.rest.messages.MessageHeaders; |
| import org.apache.flink.runtime.rest.messages.MessageParameters; |
| import org.apache.flink.runtime.rest.messages.RequestBody; |
| import org.apache.flink.runtime.rest.messages.ResponseBody; |
| import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; |
| import org.apache.flink.runtime.rest.messages.TriggerId; |
| import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders; |
| import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; |
| import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; |
| import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; |
| import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; |
| import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; |
| import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; |
| import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; |
| import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource; |
| import org.apache.flink.runtime.rest.messages.queue.QueueStatus; |
| import org.apache.flink.runtime.rest.util.RestClientException; |
| import org.apache.flink.runtime.rest.util.RestConstants; |
| import org.apache.flink.runtime.util.ExecutorThreadFactory; |
| import org.apache.flink.runtime.util.LeaderConnectionInfo; |
| import org.apache.flink.runtime.util.LeaderRetrievalUtils; |
| import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.ExecutorUtils; |
| import org.apache.flink.util.FlinkException; |
| import org.apache.flink.util.OptionalFailure; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.function.CheckedSupplier; |
| |
| import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; |
| import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| |
| import java.io.IOException; |
| import java.io.ObjectOutputStream; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Predicate; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| |
| /** |
| * A {@link ClusterClient} implementation that communicates via HTTP REST requests. |
| */ |
| public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient { |
| |
| private final RestClusterClientConfiguration restClusterClientConfiguration; |
| |
| private final RestClient restClient; |
| |
| private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO")); |
| |
| private final WaitStrategy waitStrategy; |
| |
| private final T clusterId; |
| |
| private final LeaderRetrievalService webMonitorRetrievalService; |
| |
| private final LeaderRetrievalService dispatcherRetrievalService; |
| |
| private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever(); |
| |
| private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever(); |
| |
| /** ExecutorService to run operations that can be retried on exceptions. */ |
| private ScheduledExecutorService retryExecutorService; |
| |
| public RestClusterClient(Configuration config, T clusterId) throws Exception { |
| this( |
| config, |
| null, |
| clusterId, |
| new ExponentialWaitStrategy(10L, config.getLong(RestOptions.POLL_MAX_INTERVAL)), |
| null); |
| } |
| |
| public RestClusterClient( |
| Configuration config, |
| T clusterId, |
| LeaderRetrievalService webMonitorRetrievalService) throws Exception { |
| this( |
| config, |
| null, |
| clusterId, |
| new ExponentialWaitStrategy(10L, config.getLong(RestOptions.POLL_MAX_INTERVAL)), |
| webMonitorRetrievalService); |
| } |
| |
| @VisibleForTesting |
| RestClusterClient( |
| Configuration configuration, |
| @Nullable RestClient restClient, |
| T clusterId, |
| WaitStrategy waitStrategy, |
| @Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception { |
| super(configuration); |
| this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration); |
| |
| if (restClient != null) { |
| this.restClient = restClient; |
| } else { |
| this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService); |
| } |
| |
| this.waitStrategy = Preconditions.checkNotNull(waitStrategy); |
| this.clusterId = Preconditions.checkNotNull(clusterId); |
| |
| if (webMonitorRetrievalService == null) { |
| this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever(); |
| } else { |
| this.webMonitorRetrievalService = webMonitorRetrievalService; |
| } |
| this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); |
| this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry")); |
| startLeaderRetrievers(); |
| } |
| |
| private void startLeaderRetrievers() throws Exception { |
| this.webMonitorRetrievalService.start(webMonitorLeaderRetriever); |
| this.dispatcherRetrievalService.start(dispatcherLeaderRetriever); |
| } |
| |
| @Override |
| public void shutdown() { |
| ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, retryExecutorService); |
| |
| this.restClient.shutdown(Time.seconds(5)); |
| ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); |
| |
| try { |
| webMonitorRetrievalService.stop(); |
| } catch (Exception e) { |
| log.error("An error occurred during stopping the webMonitorRetrievalService", e); |
| } |
| |
| try { |
| dispatcherRetrievalService.stop(); |
| } catch (Exception e) { |
| log.error("An error occurred during stopping the dispatcherLeaderRetriever", e); |
| } |
| |
| try { |
| // we only call this for legacy reasons to shutdown components that are started in the ClusterClient constructor |
| super.shutdown(); |
| } catch (Exception e) { |
| log.error("An error occurred during the client shutdown.", e); |
| } |
| } |
| |
| @Override |
| public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader, boolean detached) throws ProgramInvocationException { |
| log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached()); |
| |
| final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph); |
| |
| try { |
| JobSubmissionResult submissionResult = jobSubmissionFuture.get(); |
| // jobListeners is null when using bin/flink run |
| if (this.jobListeners != null){ |
| for (JobListener jobListener : this.jobListeners) { |
| jobListener.onJobSubmitted(submissionResult.getJobID()); |
| } |
| } |
| if (isDetached() || detached) { |
| return submissionResult; |
| } |
| } catch (Exception e) { |
| throw new ProgramInvocationException("Could not submit job", |
| jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); |
| } |
| |
| final CompletableFuture<JobResult> jobResultFuture = jobSubmissionFuture.thenCompose( |
| ignored -> requestJobResult(jobGraph.getJobID())); |
| final JobResult jobResult; |
| try { |
| jobResult = jobResultFuture.get(); |
| this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader); |
| if (this.jobListeners != null) { |
| for (JobListener jobListener : this.jobListeners) { |
| jobListener.onJobExecuted(lastJobExecutionResult); |
| } |
| } |
| return lastJobExecutionResult; |
| } catch (JobResult.WrappedJobException e) { |
| throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e.getCause()); |
| } catch (IOException | ClassNotFoundException e) { |
| throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); |
| } catch (Exception e) { |
| throw new ProgramInvocationException("Could not retrieve the execution result.", |
| jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { |
| JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance(); |
| final JobMessageParameters params = new JobMessageParameters(); |
| params.jobPathParameter.resolve(jobId); |
| |
| CompletableFuture<JobDetailsInfo> responseFuture = sendRequest( |
| detailsHeaders, |
| params); |
| |
| return responseFuture.thenApply(JobDetailsInfo::getJobStatus); |
| } |
| |
| /** |
| * Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple |
| * times to poll the {@link JobResult} before giving up. |
| * |
| * @param jobId specifying the job for which to retrieve the {@link JobResult} |
| * @return Future which is completed with the {@link JobResult} once the job has completed or |
| * with a failure if the {@link JobResult} could not be retrieved. |
| */ |
| @Override |
| public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) { |
| return pollResourceAsync( |
| () -> { |
| final JobMessageParameters messageParameters = new JobMessageParameters(); |
| messageParameters.jobPathParameter.resolve(jobId); |
| return sendRequest( |
| JobExecutionResultHeaders.getInstance(), |
| messageParameters); |
| }); |
| } |
| |
| /** |
| * Submits the given {@link JobGraph} to the dispatcher. |
| * |
| * @param jobGraph to submit |
| * @return Future which is completed with the submission response |
| */ |
| @Override |
| public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) { |
| // we have to enable queued scheduling because slot will be allocated lazily |
| jobGraph.setAllowQueuedScheduling(true); |
| |
| CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { |
| try { |
| final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); |
| try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { |
| objectOut.writeObject(jobGraph); |
| } |
| return jobGraphFile; |
| } catch (IOException e) { |
| throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e)); |
| } |
| }, executorService); |
| |
| CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { |
| List<String> jarFileNames = new ArrayList<>(8); |
| List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8); |
| Collection<FileUpload> filesToUpload = new ArrayList<>(8); |
| |
| filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); |
| |
| if (flinkConfig.getBoolean(CoreOptions.DISABLE_UPLOAD_USER_JARS)) { |
| log.info("Uploading user-jars is disabled"); |
| } else { |
| for (Path jar : jobGraph.getUserJars()) { |
| try { |
| if (!jar.getFileSystem().isDistributedFS()) { |
| jarFileNames.add(jar.getName()); |
| filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); |
| } |
| } catch (IOException e) { |
| throw new CompletionException(new FlinkException("Failed to upload jars.", e)); |
| } |
| } |
| } |
| for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) { |
| try { |
| Path file = new Path(artifacts.getValue().filePath); |
| if (!file.getFileSystem().isDistributedFS()) { |
| artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); |
| filesToUpload.add(new FileUpload(Paths.get(file.toUri()), RestConstants.CONTENT_TYPE_BINARY)); |
| } |
| } catch (IOException e) { |
| throw new CompletionException(new FlinkException("Failed to upload artifacts.", e)); |
| } |
| } |
| |
| final JobSubmitRequestBody requestBody = new JobSubmitRequestBody( |
| jobGraphFile.getFileName().toString(), |
| jarFileNames, |
| artifactFileNames); |
| |
| return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload)); |
| }); |
| |
| final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose( |
| requestAndFileUploads -> sendRetriableRequest( |
| JobSubmitHeaders.getInstance(), |
| EmptyMessageParameters.getInstance(), |
| requestAndFileUploads.f0, |
| requestAndFileUploads.f1, |
| isConnectionProblemOrServiceUnavailable()) |
| ); |
| |
| submissionFuture |
| .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile) |
| .thenAccept(jobGraphFile -> { |
| try { |
| Files.delete(jobGraphFile); |
| } catch (IOException e) { |
| log.warn("Could not delete temporary file {}.", jobGraphFile, e); |
| } |
| }); |
| |
| return submissionFuture |
| .thenApply( |
| (JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID())) |
| .exceptionally( |
| (Throwable throwable) -> { |
| throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", throwable)); |
| }); |
| } |
| |
| @Override |
| public void stop(JobID jobID) throws Exception { |
| JobTerminationMessageParameters params = new JobTerminationMessageParameters(); |
| params.jobPathParameter.resolve(jobID); |
| params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP)); |
| CompletableFuture<EmptyResponseBody> responseFuture = sendRequest( |
| JobTerminationHeaders.getInstance(), |
| params); |
| responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public void cancel(JobID jobID) throws Exception { |
| JobTerminationMessageParameters params = new JobTerminationMessageParameters(); |
| params.jobPathParameter.resolve(jobID); |
| params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); |
| CompletableFuture<EmptyResponseBody> responseFuture = sendRequest( |
| JobTerminationHeaders.getInstance(), |
| params); |
| responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { |
| return triggerSavepoint(jobId, savepointDirectory, true).get(); |
| } |
| |
| @Override |
| public CompletableFuture<String> triggerSavepoint( |
| final JobID jobId, |
| final @Nullable String savepointDirectory) { |
| return triggerSavepoint(jobId, savepointDirectory, false); |
| } |
| |
| private CompletableFuture<String> triggerSavepoint( |
| final JobID jobId, |
| final @Nullable String savepointDirectory, |
| final boolean cancelJob) { |
| final SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance(); |
| final SavepointTriggerMessageParameters savepointTriggerMessageParameters = |
| savepointTriggerHeaders.getUnresolvedMessageParameters(); |
| savepointTriggerMessageParameters.jobID.resolve(jobId); |
| |
| final CompletableFuture<TriggerResponse> responseFuture = sendRequest( |
| savepointTriggerHeaders, |
| savepointTriggerMessageParameters, |
| new SavepointTriggerRequestBody(savepointDirectory, cancelJob)); |
| |
| return responseFuture.thenCompose(savepointTriggerResponseBody -> { |
| final TriggerId savepointTriggerId = savepointTriggerResponseBody.getTriggerId(); |
| return pollSavepointAsync(jobId, savepointTriggerId); |
| }).thenApply(savepointInfo -> { |
| if (savepointInfo.getFailureCause() != null) { |
| throw new CompletionException(savepointInfo.getFailureCause()); |
| } |
| return savepointInfo.getLocation(); |
| }); |
| } |
| |
| @Override |
| public Map<String, OptionalFailure<Object>> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception { |
| final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance(); |
| final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters(); |
| accMsgParams.jobPathParameter.resolve(jobID); |
| accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true)); |
| |
| CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest( |
| accumulatorsHeaders, |
| accMsgParams); |
| |
| Map<String, OptionalFailure<Object>> result = Collections.emptyMap(); |
| |
| try { |
| result = responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> { |
| try { |
| return AccumulatorHelper.deserializeAccumulators( |
| accumulatorsInfo.getSerializedUserAccumulators(), |
| loader); |
| } catch (Exception e) { |
| throw new CompletionException( |
| new FlinkException( |
| String.format("Deserialization of accumulators for job %s failed.", jobID), |
| e)); |
| } |
| }).get(timeout.toMillis(), TimeUnit.MILLISECONDS); |
| } catch (ExecutionException ee) { |
| ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(ee)); |
| } |
| |
| return result; |
| } |
| |
| private CompletableFuture<SavepointInfo> pollSavepointAsync( |
| final JobID jobId, |
| final TriggerId triggerID) { |
| return pollResourceAsync(() -> { |
| final SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance(); |
| final SavepointStatusMessageParameters savepointStatusMessageParameters = |
| savepointStatusHeaders.getUnresolvedMessageParameters(); |
| savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId); |
| savepointStatusMessageParameters.triggerIdPathParameter.resolve(triggerID); |
| return sendRequest( |
| savepointStatusHeaders, |
| savepointStatusMessageParameters); |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Collection<JobStatusMessage>> listJobs() { |
| return sendRequest(JobsOverviewHeaders.getInstance()) |
| .thenApply( |
| (multipleJobsDetails) -> multipleJobsDetails |
| .getJobs() |
| .stream() |
| .map(detail -> new JobStatusMessage( |
| detail.getJobId(), |
| detail.getJobName(), |
| detail.getStatus(), |
| detail.getStartTime())) |
| .collect(Collectors.toList())); |
| } |
| |
| @Override |
| public T getClusterId() { |
| return clusterId; |
| } |
| |
| @Override |
| public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException { |
| return LeaderRetrievalUtils.retrieveLeaderConnectionInfo( |
| highAvailabilityServices.getDispatcherLeaderRetriever(), |
| timeout); |
| } |
| |
| @Override |
| public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism) { |
| |
| final RescalingTriggerHeaders rescalingTriggerHeaders = RescalingTriggerHeaders.getInstance(); |
| final RescalingTriggerMessageParameters rescalingTriggerMessageParameters = rescalingTriggerHeaders.getUnresolvedMessageParameters(); |
| rescalingTriggerMessageParameters.jobPathParameter.resolve(jobId); |
| rescalingTriggerMessageParameters.rescalingParallelismQueryParameter.resolve(Collections.singletonList(newParallelism)); |
| |
| final CompletableFuture<TriggerResponse> rescalingTriggerResponseFuture = sendRequest( |
| rescalingTriggerHeaders, |
| rescalingTriggerMessageParameters); |
| |
| final CompletableFuture<AsynchronousOperationInfo> rescalingOperationFuture = rescalingTriggerResponseFuture.thenCompose( |
| (TriggerResponse triggerResponse) -> { |
| final TriggerId triggerId = triggerResponse.getTriggerId(); |
| final RescalingStatusHeaders rescalingStatusHeaders = RescalingStatusHeaders.getInstance(); |
| final RescalingStatusMessageParameters rescalingStatusMessageParameters = rescalingStatusHeaders.getUnresolvedMessageParameters(); |
| |
| rescalingStatusMessageParameters.jobPathParameter.resolve(jobId); |
| rescalingStatusMessageParameters.triggerIdPathParameter.resolve(triggerId); |
| |
| return pollResourceAsync( |
| () -> sendRequest( |
| rescalingStatusHeaders, |
| rescalingStatusMessageParameters)); |
| }); |
| |
| return rescalingOperationFuture.thenApply( |
| (AsynchronousOperationInfo asynchronousOperationInfo) -> { |
| if (asynchronousOperationInfo.getFailureCause() == null) { |
| return Acknowledge.get(); |
| } else { |
| throw new CompletionException(asynchronousOperationInfo.getFailureCause()); |
| } |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { |
| final SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath); |
| |
| final CompletableFuture<TriggerResponse> savepointDisposalTriggerFuture = sendRequest( |
| SavepointDisposalTriggerHeaders.getInstance(), |
| savepointDisposalRequest); |
| |
| final CompletableFuture<AsynchronousOperationInfo> savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose( |
| (TriggerResponse triggerResponse) -> { |
| final TriggerId triggerId = triggerResponse.getTriggerId(); |
| final SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance(); |
| final SavepointDisposalStatusMessageParameters savepointDisposalStatusMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters(); |
| savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve(triggerId); |
| |
| return pollResourceAsync( |
| () -> sendRequest( |
| savepointDisposalStatusHeaders, |
| savepointDisposalStatusMessageParameters)); |
| }); |
| |
| return savepointDisposalFuture.thenApply( |
| (AsynchronousOperationInfo asynchronousOperationInfo) -> { |
| if (asynchronousOperationInfo.getFailureCause() == null) { |
| return Acknowledge.get(); |
| } else { |
| throw new CompletionException(asynchronousOperationInfo.getFailureCause()); |
| } |
| }); |
| } |
| |
| @Override |
| public void shutDownCluster() { |
| try { |
| sendRequest(ShutdownHeaders.getInstance()).get(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } catch (ExecutionException e) { |
| log.error("Error while shutting down cluster", e); |
| } |
| } |
| |
| /** |
| * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until |
| * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes |
| * {@link QueueStatus.Id#COMPLETED COMPLETED}. The future completes with the result of |
| * {@link AsynchronouslyCreatedResource#resource()}. |
| * |
| * @param resourceFutureSupplier The operation which polls for the |
| * {@code AsynchronouslyCreatedResource}. |
| * @param <R> The type of the resource. |
| * @param <A> The type of the {@code AsynchronouslyCreatedResource}. |
| * @return A {@code CompletableFuture} delivering the resource. |
| */ |
| private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync( |
| final Supplier<CompletableFuture<A>> resourceFutureSupplier) { |
| return pollResourceAsync(resourceFutureSupplier, new CompletableFuture<>(), 0); |
| } |
| |
| private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync( |
| final Supplier<CompletableFuture<A>> resourceFutureSupplier, |
| final CompletableFuture<R> resultFuture, |
| final long attempt) { |
| |
| resourceFutureSupplier.get().whenComplete((asynchronouslyCreatedResource, throwable) -> { |
| if (throwable != null) { |
| resultFuture.completeExceptionally(throwable); |
| } else { |
| if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) { |
| resultFuture.complete(asynchronouslyCreatedResource.resource()); |
| } else { |
| retryExecutorService.schedule(() -> { |
| pollResourceAsync(resourceFutureSupplier, resultFuture, attempt + 1); |
| }, waitStrategy.sleepTime(attempt), TimeUnit.MILLISECONDS); |
| } |
| } |
| }); |
| |
| return resultFuture; |
| } |
| |
| // ====================================== |
| // Legacy stuff we actually implement |
| // ====================================== |
| |
| @Override |
| public boolean hasUserJarsInClassPath(List<URL> userJarFiles) { |
| return false; |
| } |
| |
| @Override |
| public void waitForClusterToBeReady() { |
| // no op |
| } |
| |
| @Override |
| public String getWebInterfaceURL() { |
| try { |
| return getWebMonitorBaseUrl().get().toString(); |
| } catch (InterruptedException | ExecutionException e) { |
| ExceptionUtils.checkInterrupted(e); |
| |
| log.warn("Could not retrieve the web interface URL for the cluster.", e); |
| return "Unknown address."; |
| } |
| } |
| |
| @Override |
| public GetClusterStatusResponse getClusterStatus() { |
| return null; |
| } |
| |
| @Override |
| public List<String> getNewMessages() { |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public int getMaxSlots() { |
| return MAX_SLOTS_UNKNOWN; |
| } |
| |
| //------------------------------------------------------------------------- |
| // RestClient Helper |
| //------------------------------------------------------------------------- |
| |
| private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> |
| sendRequest(M messageHeaders, U messageParameters) { |
| return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance()); |
| } |
| |
| private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> |
| sendRequest(M messageHeaders, R request) { |
| return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request); |
| } |
| |
| @VisibleForTesting |
| <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> |
| sendRequest(M messageHeaders) { |
| return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()); |
| } |
| |
| private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> |
| sendRequest(M messageHeaders, U messageParameters, R request) { |
| return sendRetriableRequest( |
| messageHeaders, messageParameters, request, isConnectionProblemOrServiceUnavailable()); |
| } |
| |
| private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> |
| sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) { |
| return sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate); |
| } |
| |
| private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> |
| sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) { |
| return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> { |
| try { |
| return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload); |
| } catch (IOException e) { |
| throw new CompletionException(e); |
| } |
| }), retryPredicate); |
| } |
| |
| private <C> CompletableFuture<C> retry( |
| CheckedSupplier<CompletableFuture<C>> operation, |
| Predicate<Throwable> retryPredicate) { |
| return FutureUtils.retryWithDelay( |
| CheckedSupplier.unchecked(operation), |
| restClusterClientConfiguration.getRetryMaxAttempts(), |
| Time.milliseconds(restClusterClientConfiguration.getRetryDelay()), |
| retryPredicate, |
| new ScheduledExecutorServiceAdapter(retryExecutorService)); |
| } |
| |
| private static Predicate<Throwable> isConnectionProblemOrServiceUnavailable() { |
| return isConnectionProblemException().or(isServiceUnavailable()); |
| } |
| |
| private static Predicate<Throwable> isConnectionProblemException() { |
| return (throwable) -> |
| ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() || |
| ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() || |
| ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || |
| ExceptionUtils.findThrowable(throwable, IOException.class).isPresent(); |
| } |
| |
| private static Predicate<Throwable> isServiceUnavailable() { |
| return httpExceptionCodePredicate(code -> code == HttpResponseStatus.SERVICE_UNAVAILABLE.code()); |
| } |
| |
| private static Predicate<Throwable> httpExceptionCodePredicate(Predicate<Integer> statusCodePredicate) { |
| return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class) |
| .map(restClientException -> { |
| final int code = restClientException.getHttpResponseStatus().code(); |
| return statusCodePredicate.test(code); |
| }) |
| .orElse(false); |
| } |
| |
| @VisibleForTesting |
| CompletableFuture<URL> getWebMonitorBaseUrl() { |
| return FutureUtils.orTimeout( |
| webMonitorLeaderRetriever.getLeaderFuture(), |
| restClusterClientConfiguration.getAwaitLeaderTimeout(), |
| TimeUnit.MILLISECONDS) |
| .thenApplyAsync(leaderAddressSessionId -> { |
| final String url = leaderAddressSessionId.f0; |
| try { |
| return new URL(url); |
| } catch (MalformedURLException e) { |
| throw new IllegalArgumentException("Could not parse URL from " + url, e); |
| } |
| }, executorService); |
| } |
| } |