blob: 0b842d1974ece72bdcc979e85a897e976c8d4cd1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.compute.task;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static;
import static org.apache.ignite.compute.JobState.CANCELED;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
import static org.apache.ignite.compute.JobState.FAILED;
import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
import static org.apache.ignite.internal.util.ArrayUtils.concat;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.task.ComputeJobRunner;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;
* Internal map reduce task execution object. Runs the {@link MapReduceTask#split(TaskExecutionContext, Object...)} method of the task as a
* compute job, then submits the resulting list of jobs. Waits for completion of all compute jobs, then submits the
* {@link MapReduceTask#reduce(Map)} method as a compute job. The result of the task is the result of the split method.
* @param <R> Task result type.
public class TaskExecutionInternal<R> implements JobExecution<R> {
private static final IgniteLogger LOG = Loggers.forClass(TaskExecutionInternal.class);
private final QueueExecution<SplitResult<R>> splitExecution;
private final CompletableFuture<List<JobExecution<Object>>> executionsFuture;
private final CompletableFuture<Map<UUID, Object>> resultsFuture;
private final CompletableFuture<QueueExecution<R>> reduceExecutionFuture;
private final AtomicReference<JobStatus> reduceFailedStatus = new AtomicReference<>();
* Construct an execution object and starts executing.
* @param executorService Compute jobs executor.
* @param jobSubmitter Compute jobs submitter.
* @param taskClass Map reduce task class.
* @param context Task execution context.
* @param args Task arguments.
public TaskExecutionInternal(
PriorityQueueExecutor executorService,
JobSubmitter jobSubmitter,
Class<? extends MapReduceTask<R>> taskClass,
TaskExecutionContext context,
Object... args
) {
LOG.debug("Executing task {}", taskClass.getName());
splitExecution = executorService.submit(
() -> {
MapReduceTask<R> task = instantiateTask(taskClass);
return new SplitResult<>(task, task.split(context, args));
executionsFuture = splitExecution.resultAsync().thenApply(splitResult -> {
List<ComputeJobRunner> runners = splitResult.runners();
LOG.debug("Submitting {} jobs for {}", runners.size(), taskClass.getName());
return submit(runners, jobSubmitter);
resultsFuture = executionsFuture.thenCompose(TaskExecutionInternal::resultsAsync);
reduceExecutionFuture = resultsFuture.thenApply(results -> {
LOG.debug("Running reduce job for {}", taskClass.getName());
// This future is already finished
MapReduceTask<R> task = splitExecution.resultAsync().thenApply(SplitResult::task).join();
return executorService.submit(
() -> task.reduce(results),
private void captureReduceFailure(QueueExecution<R> reduceExecution, Throwable throwable) {
if (throwable != null) {
// Capture the reduce execution failure reason and time.
JobState state = throwable instanceof CancellationException ? CANCELED : FAILED;
public CompletableFuture<R> resultAsync() {
return reduceExecutionFuture.thenCompose(QueueExecution::resultAsync);
public CompletableFuture<@Nullable JobStatus> statusAsync() {
JobStatus splitStatus = splitExecution.status();
if (splitStatus == null) {
// Return null even if the reduce execution can still be retained.
return nullCompletedFuture();
if (splitStatus.state() != COMPLETED) {
return completedFuture(splitStatus);
// This future is complete when reduce execution job is submitted, return status from it.
if (reduceExecutionFuture.isDone()) {
return reduceExecutionFuture.handle((reduceExecution, throwable) -> {
if (throwable == null) {
JobStatus reduceStatus = reduceExecution.status();
if (reduceStatus == null) {
return null;
return reduceStatus.toBuilder()
return reduceFailedStatus.get();
// At this point split is complete but reduce job is not submitted yet.
return completedFuture(splitStatus.toBuilder()
public CompletableFuture<@Nullable Boolean> cancelAsync() {
// If the split job is not complete, this will cancel the executions future.
// This means we didn't submit any jobs yet.
if (executionsFuture.cancel(true)) {
return trueCompletedFuture();
// Split job was complete, results future was running, but not complete yet.
if (resultsFuture.cancel(true)) {
return executionsFuture.thenCompose(executions -> {
CompletableFuture<Boolean>[] cancelFutures =
return allOf(cancelFutures);
}).thenApply(unused -> true);
// Results arrived but reduce is not yet submitted
if (reduceExecutionFuture.cancel(true)) {
return trueCompletedFuture();
return reduceExecutionFuture.thenApply(QueueExecution::cancel);
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
// If the split job is not started
if (splitExecution.changePriority(newPriority)) {
return trueCompletedFuture();
// This future is complete when reduce execution job is submitted, try to change its priority.
if (reduceExecutionFuture.isDone()) {
return reduceExecutionFuture.thenApply(reduceExecution -> reduceExecution.changePriority(newPriority));
return executionsFuture.thenCompose(executions -> {
CompletableFuture<Boolean>[] changePriorityFutures =
.map(execution -> execution.changePriorityAsync(newPriority))
return allOf(changePriorityFutures).thenApply(unused -> {
List<@Nullable Boolean> results =;
if ( -> b == Boolean.TRUE)) {
return true;
if ( {
//noinspection RedundantCast this cast is to satisfy spotbugs
return (Boolean) null;
return false;
CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
return executionsFuture.thenCompose(executions -> {
CompletableFuture<JobStatus>[] statusFutures =
return CompletableFutures.allOf(statusFutures);
private static CompletableFuture<Map<UUID, Object>> resultsAsync(List<JobExecution<Object>> executions) {
CompletableFuture<?>[] resultFutures =
CompletableFuture<UUID>[] idFutures =
return allOf(concat(resultFutures, idFutures)).thenApply(unused -> {
Map<UUID, Object> results = new HashMap<>();
for (int i = 0; i < resultFutures.length; i++) {
results.put(idFutures[i].join(), resultFutures[i].join());
return results;
private static <R> List<JobExecution<Object>> submit(List<ComputeJobRunner> runners, JobSubmitter jobSubmitter) {
private static class SplitResult<R> {
private final MapReduceTask<R> task;
private final List<ComputeJobRunner> runners;
private SplitResult(MapReduceTask<R> task, List<ComputeJobRunner> runners) {
this.task = task;
this.runners = runners;
private List<ComputeJobRunner> runners() {
return runners;
private MapReduceTask<R> task() {
return task;