blob: 0b842d1974ece72bdcc979e85a897e976c8d4cd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.compute.task;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.compute.JobState.CANCELED;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
import static org.apache.ignite.compute.JobState.FAILED;
import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
import static org.apache.ignite.internal.util.ArrayUtils.concat;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.task.ComputeJobRunner;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;
/**
* Internal map reduce task execution object. Runs the {@link MapReduceTask#split(TaskExecutionContext, Object...)} method of the task as a
* compute job, then submits the resulting list of jobs. Waits for completion of all compute jobs, then submits the
* {@link MapReduceTask#reduce(Map)} method as a compute job. The result of the task is the result of the split method.
*
* @param <R> Task result type.
*/
@SuppressWarnings("unchecked")
public class TaskExecutionInternal<R> implements JobExecution<R> {
private static final IgniteLogger LOG = Loggers.forClass(TaskExecutionInternal.class);
private final QueueExecution<SplitResult<R>> splitExecution;
private final CompletableFuture<List<JobExecution<Object>>> executionsFuture;
private final CompletableFuture<Map<UUID, Object>> resultsFuture;
private final CompletableFuture<QueueExecution<R>> reduceExecutionFuture;
private final AtomicReference<JobStatus> reduceFailedStatus = new AtomicReference<>();
/**
* Construct an execution object and starts executing.
*
* @param executorService Compute jobs executor.
* @param jobSubmitter Compute jobs submitter.
* @param taskClass Map reduce task class.
* @param context Task execution context.
* @param args Task arguments.
*/
public TaskExecutionInternal(
PriorityQueueExecutor executorService,
JobSubmitter jobSubmitter,
Class<? extends MapReduceTask<R>> taskClass,
TaskExecutionContext context,
Object... args
) {
LOG.debug("Executing task {}", taskClass.getName());
splitExecution = executorService.submit(
() -> {
MapReduceTask<R> task = instantiateTask(taskClass);
return new SplitResult<>(task, task.split(context, args));
},
Integer.MAX_VALUE,
0
);
executionsFuture = splitExecution.resultAsync().thenApply(splitResult -> {
List<ComputeJobRunner> runners = splitResult.runners();
LOG.debug("Submitting {} jobs for {}", runners.size(), taskClass.getName());
return submit(runners, jobSubmitter);
});
resultsFuture = executionsFuture.thenCompose(TaskExecutionInternal::resultsAsync);
reduceExecutionFuture = resultsFuture.thenApply(results -> {
LOG.debug("Running reduce job for {}", taskClass.getName());
// This future is already finished
MapReduceTask<R> task = splitExecution.resultAsync().thenApply(SplitResult::task).join();
return executorService.submit(
() -> task.reduce(results),
Integer.MAX_VALUE,
0
);
}).whenComplete(this::captureReduceFailure);
}
private void captureReduceFailure(QueueExecution<R> reduceExecution, Throwable throwable) {
if (throwable != null) {
// Capture the reduce execution failure reason and time.
JobState state = throwable instanceof CancellationException ? CANCELED : FAILED;
reduceFailedStatus.set(
splitExecution.status().toBuilder()
.state(state)
.finishTime(Instant.now())
.build()
);
}
}
@Override
public CompletableFuture<R> resultAsync() {
return reduceExecutionFuture.thenCompose(QueueExecution::resultAsync);
}
@Override
public CompletableFuture<@Nullable JobStatus> statusAsync() {
JobStatus splitStatus = splitExecution.status();
if (splitStatus == null) {
// Return null even if the reduce execution can still be retained.
return nullCompletedFuture();
}
if (splitStatus.state() != COMPLETED) {
return completedFuture(splitStatus);
}
// This future is complete when reduce execution job is submitted, return status from it.
if (reduceExecutionFuture.isDone()) {
return reduceExecutionFuture.handle((reduceExecution, throwable) -> {
if (throwable == null) {
JobStatus reduceStatus = reduceExecution.status();
if (reduceStatus == null) {
return null;
}
return reduceStatus.toBuilder()
.id(splitStatus.id())
.createTime(splitStatus.createTime())
.startTime(splitStatus.startTime())
.build();
}
return reduceFailedStatus.get();
});
}
// At this point split is complete but reduce job is not submitted yet.
return completedFuture(splitStatus.toBuilder()
.state(EXECUTING)
.finishTime(null)
.build());
}
@Override
public CompletableFuture<@Nullable Boolean> cancelAsync() {
// If the split job is not complete, this will cancel the executions future.
splitExecution.cancel();
// This means we didn't submit any jobs yet.
if (executionsFuture.cancel(true)) {
return trueCompletedFuture();
}
// Split job was complete, results future was running, but not complete yet.
if (resultsFuture.cancel(true)) {
return executionsFuture.thenCompose(executions -> {
CompletableFuture<Boolean>[] cancelFutures = executions.stream()
.map(JobExecution::cancelAsync)
.toArray(CompletableFuture[]::new);
return allOf(cancelFutures);
}).thenApply(unused -> true);
}
// Results arrived but reduce is not yet submitted
if (reduceExecutionFuture.cancel(true)) {
return trueCompletedFuture();
}
return reduceExecutionFuture.thenApply(QueueExecution::cancel);
}
@Override
public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
// If the split job is not started
if (splitExecution.changePriority(newPriority)) {
return trueCompletedFuture();
}
// This future is complete when reduce execution job is submitted, try to change its priority.
if (reduceExecutionFuture.isDone()) {
return reduceExecutionFuture.thenApply(reduceExecution -> reduceExecution.changePriority(newPriority));
}
return executionsFuture.thenCompose(executions -> {
CompletableFuture<Boolean>[] changePriorityFutures = executions.stream()
.map(execution -> execution.changePriorityAsync(newPriority))
.toArray(CompletableFuture[]::new);
return allOf(changePriorityFutures).thenApply(unused -> {
List<@Nullable Boolean> results = Arrays.stream(changePriorityFutures).map(CompletableFuture::join).collect(toList());
if (results.stream().allMatch(b -> b == Boolean.TRUE)) {
return true;
}
if (results.stream().anyMatch(Objects::isNull)) {
//noinspection RedundantCast this cast is to satisfy spotbugs
return (Boolean) null;
}
return false;
});
});
}
CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
return executionsFuture.thenCompose(executions -> {
CompletableFuture<JobStatus>[] statusFutures = executions.stream()
.map(JobExecution::statusAsync)
.toArray(CompletableFuture[]::new);
return CompletableFutures.allOf(statusFutures);
});
}
private static CompletableFuture<Map<UUID, Object>> resultsAsync(List<JobExecution<Object>> executions) {
CompletableFuture<?>[] resultFutures = executions.stream()
.map(JobExecution::resultAsync)
.toArray(CompletableFuture[]::new);
CompletableFuture<UUID>[] idFutures = executions.stream()
.map(JobExecution::idAsync)
.toArray(CompletableFuture[]::new);
return allOf(concat(resultFutures, idFutures)).thenApply(unused -> {
Map<UUID, Object> results = new HashMap<>();
for (int i = 0; i < resultFutures.length; i++) {
results.put(idFutures[i].join(), resultFutures[i].join());
}
return results;
});
}
private static <R> List<JobExecution<Object>> submit(List<ComputeJobRunner> runners, JobSubmitter jobSubmitter) {
return runners.stream()
.map(jobSubmitter::submit)
.collect(toList());
}
private static class SplitResult<R> {
private final MapReduceTask<R> task;
private final List<ComputeJobRunner> runners;
private SplitResult(MapReduceTask<R> task, List<ComputeJobRunner> runners) {
this.task = task;
this.runners = runners;
}
private List<ComputeJobRunner> runners() {
return runners;
}
private MapReduceTask<R> task() {
return task;
}
}
}