IGNITE-22064 General MapReduce API (#3665)
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
index 0d8aa21..515e902 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
@@ -22,13 +22,12 @@
*
* @param <R> Job result type.
*/
-
public interface ComputeJob<R> {
/**
* Executes the job on an Ignite node.
*
- * @param context The execution context.
- * @param args Job arguments.
+ * @param context The execution context.
+ * @param args Job arguments.
* @return Job result.
*/
R execute(JobExecutionContext context, Object... args);
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 1cefe19..ad3397b 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -28,6 +28,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
@@ -46,7 +47,7 @@
* @param nodes Candidate nodes; the job will be executed on one of them.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Job execution object.
*/
@@ -86,7 +87,7 @@
* @param nodes Candidate nodes; the job will be executed on one of them.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Job result future.
*/
@@ -127,7 +128,7 @@
* @param nodes Candidate nodes; the job will be executed on one of them.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Job result.
* @throws ComputeException If there is any problem executing the job.
@@ -169,7 +170,7 @@
* @param key Key that identifies the node to execute the job on.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @param <R> Job result type.
* @return Job execution object.
@@ -215,7 +216,7 @@
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param args Arguments of the job.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param <R> Job result type.
* @return Job execution object.
*/
@@ -261,7 +262,7 @@
* @param key Key that identifies the node to execute the job on.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @param <R> Job result type.
* @return Job result future.
@@ -310,7 +311,7 @@
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
* @param args Arguments of the job.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param <R> Job result type.
* @return Job result future.
*/
@@ -359,7 +360,7 @@
* @param key Key that identifies the node to execute the job on.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Job result.
* @throws ComputeException If there is any problem executing the job.
@@ -405,7 +406,7 @@
* @param keyMapper Mapper used to map the key to a binary representation.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Job result.
* @throws ComputeException If there is any problem executing the job.
@@ -451,7 +452,7 @@
* @param nodes Nodes to execute the job on.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Map from node to job execution object.
*/
@@ -490,7 +491,7 @@
* @param nodes Nodes to execute the job on.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Map from node to job result.
*/
@@ -544,7 +545,7 @@
* @param nodes Nodes to execute the job on.
* @param units Deployment units. Can be empty.
* @param jobClassName Name of the job class to execute.
- * @param options job execution options (priority, max retries).
+ * @param options Job execution options (priority, max retries).
* @param args Arguments of the job.
* @return Map from node to job result.
* @throws ComputeException If there is any problem executing the job.
@@ -585,4 +586,40 @@
) {
return executeBroadcast(nodes, units, jobClassName, DEFAULT, args);
}
+
+ /**
+ * Submits a {@link MapReduceTask} of the given class for an execution.
+ *
+ * @param units Deployment units.
+ * @param taskClassName Map reduce task class name.
+ * @param args Task arguments.
+ * @param <R> Task result type.
+ * @return Task execution interface.
+ */
+ <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args);
+
+ /**
+ * Submits a {@link MapReduceTask} of the given class for an execution. A shortcut for {@code submitMapReduce(...).resultAsync()}.
+ *
+ * @param units Deployment units.
+ * @param taskClassName Map reduce task class name.
+ * @param args Task arguments.
+ * @param <R> Task result type.
+ * @return Task result future.
+ */
+ default <R> CompletableFuture<R> executeMapReduceAsync(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ return this.<R>submitMapReduce(units, taskClassName, args).resultAsync();
+ }
+
+ /**
+ * Executes a {@link MapReduceTask} of the given class.
+ *
+ * @param units Deployment units.
+ * @param taskClassName Map reduce task class name.
+ * @param args Task arguments.
+ * @param <R> Task result type.
+ * @return Task result.
+ * @throws ComputeException If there is any problem executing the task.
+ */
+ <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args);
}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index 8c30a8f..58a0370 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -23,6 +23,11 @@
* Context of the {@link ComputeJob} execution.
*/
public interface JobExecutionContext {
+ /**
+ * Ignite API entry point.
+ *
+ * @return Ignite instance.
+ */
Ignite ignite();
/**
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java b/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java
new file mode 100644
index 0000000..a12fd29
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Compute task control object. Methods inherited from the {@link JobExecution} allows control of the task coordination job.
+ *
+ * @param <R> Task result type.
+ */
+public interface TaskExecution<R> extends JobExecution<R> {
+ /**
+ * Returns a collection of statuses of the jobs which are executing under this task. The resulting future is completed only after the
+ * jobs are submitted for execution. The list could contain {@code null} values if the time for retaining job status has been exceeded.
+ *
+ * @return A list of current statuses of the jobs.
+ */
+ CompletableFuture<List<@Nullable JobStatus>> statusesAsync();
+
+ /**
+ * Returns a collection of ids of the jobs which are executing under this task. The resulting future is completed only after the
+ * jobs are submitted for execution. The list could contain {@code null} values if the time for retaining job status has been exceeded.
+ *
+ * @return A list of ids of the jobs.
+ */
+ default CompletableFuture<List<@Nullable UUID>> idsAsync() {
+ return statusesAsync().thenApply(statuses -> statuses.stream()
+ .map(jobStatus -> jobStatus != null ? jobStatus.id() : null)
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java b/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java
new file mode 100644
index 0000000..2445fd5
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.task;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A description of the job to be submitted as a result of the split step of the {@link MapReduceTask}. Reflects the parameters of the
+ * {@link org.apache.ignite.compute.IgniteCompute#submit(Set, List, String, JobExecutionOptions, Object...) IgniteCompute#submit} method.
+ */
+public class ComputeJobRunner {
+ private final Set<ClusterNode> nodes;
+
+ private final List<DeploymentUnit> units;
+
+ private final String jobClassName;
+
+ private final JobExecutionOptions options;
+
+ private final Object[] args;
+
+ private ComputeJobRunner(
+ Set<ClusterNode> nodes,
+ List<DeploymentUnit> units,
+ String jobClassName,
+ JobExecutionOptions options,
+ Object[] args
+ ) {
+ this.nodes = Collections.unmodifiableSet(nodes);
+ this.units = units;
+ this.jobClassName = jobClassName;
+ this.options = options;
+ this.args = args;
+ }
+
+ /**
+ * Candidate nodes; the job will be executed on one of them.
+ *
+ * @return A set of candidate nodes.
+ */
+ public Set<ClusterNode> nodes() {
+ return nodes;
+ }
+
+ /**
+ * Deployment units. Can be empty.
+ *
+ * @return Deployment units.
+ */
+ public List<DeploymentUnit> units() {
+ return units;
+ }
+
+ /**
+ * Name of the job class to execute.
+ *
+ * @return Name of the job class to execute.
+ */
+ public String jobClassName() {
+ return jobClassName;
+ }
+
+ /**
+ * Job execution options (priority, max retries).
+ *
+ * @return Job execution options.
+ */
+ public JobExecutionOptions options() {
+ return options;
+ }
+
+ /**
+ * Arguments of the job.
+ *
+ * @return Arguments of the job.
+ */
+ public Object[] args() {
+ return args;
+ }
+
+ /**
+ * Returns new builder using this definition.
+ *
+ * @return New builder.
+ */
+ public ComputeJobRunnerBuilder toBuilder() {
+ return builder().nodes(nodes).units(units).jobClassName(jobClassName).options(options).args(args);
+ }
+
+ /**
+ * Returns new builder.
+ *
+ * @return New builder.
+ */
+ public static ComputeJobRunnerBuilder builder() {
+ return new ComputeJobRunnerBuilder();
+ }
+
+ /**
+ * Job submit parameters builder.
+ */
+ public static class ComputeJobRunnerBuilder {
+ private final Set<ClusterNode> nodes = new HashSet<>();
+
+ private final List<DeploymentUnit> units = new ArrayList<>();
+
+ private String jobClassName;
+
+ private JobExecutionOptions options = JobExecutionOptions.DEFAULT;
+
+ private Object[] args;
+
+ /**
+ * Adds nodes to the set of candidate nodes.
+ *
+ * @param nodes A collection of candidate nodes.
+ * @return Builder instance.
+ */
+ public ComputeJobRunnerBuilder nodes(Collection<ClusterNode> nodes) {
+ this.nodes.addAll(nodes);
+ return this;
+ }
+
+ /**
+ * Adds a node to the set of candidate nodes.
+ *
+ * @param node Candidate node.
+ * @return Builder instance.
+ */
+ public ComputeJobRunnerBuilder node(ClusterNode node) {
+ nodes.add(node);
+ return this;
+ }
+
+ /**
+ * Adds deployment units.
+ *
+ * @param units A collection of deployment units.
+ * @return Builder instance.
+ */
+ public ComputeJobRunnerBuilder units(Collection<DeploymentUnit> units) {
+ this.units.addAll(units);
+ return this;
+ }
+
+ /**
+ * Sets the name of the job class to execute.
+ *
+ * @param jobClassName A job class name.
+ * @return Builder instance.
+ */
+ public ComputeJobRunnerBuilder jobClassName(String jobClassName) {
+ this.jobClassName = jobClassName;
+ return this;
+ }
+
+ /**
+ * Sets job execution options (priority, max retries).
+ *
+ * @param options Job execution options.
+ * @return Builder instance.
+ */
+ public ComputeJobRunnerBuilder options(JobExecutionOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ /**
+ * Sets arguments of the job.
+ *
+ * @param args Arguments of the job.
+ * @return Builder instance.
+ */
+ public ComputeJobRunnerBuilder args(Object... args) {
+ this.args = args;
+ return this;
+ }
+
+ /**
+ * Constructs a compute job description object.
+ *
+ * @return Description object.
+ */
+ public ComputeJobRunner build() {
+ if (nodes.isEmpty()) {
+ throw new IllegalArgumentException();
+ }
+
+ return new ComputeJobRunner(nodes, units, jobClassName, options, args);
+ }
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
new file mode 100644
index 0000000..09e88ce
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.task;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * A map reduce task interface. Implement this interface and pass a name of the implemented class to the
+ * {@link org.apache.ignite.compute.IgniteCompute#submitMapReduce(List, String, Object...) IgniteCompute#submitMapReduce} method to run this
+ * task.
+ *
+ * @param <R> Result type.
+ */
+public interface MapReduceTask<R> {
+ /**
+ * This method should return a list of compute job execution parameters which will be used to submit compute jobs.
+ *
+ * @param taskContext Task execution context.
+ * @param args Map reduce task arguments.
+ * @return A list of compute job execution parameters.
+ */
+ List<ComputeJobRunner> split(TaskExecutionContext taskContext, Object... args);
+
+ /**
+ * This is a finishing step in the task execution. This method will be called with the map from identifiers of compute jobs submitted as
+ * a result of the {@link #split(TaskExecutionContext, Object...)} method call to the results of the execution of the corresponding
+ * job. The return value of this method will be returned as a result of this task.
+ *
+ * @param results Map from compute job ids to their results.
+ * @return Final task result.
+ */
+ R reduce(Map<UUID, ?> results);
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
new file mode 100644
index 0000000..2a45023
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.task;
+
+import org.apache.ignite.Ignite;
+
+/** Context of the compute task execution. */
+public interface TaskExecutionContext {
+ /**
+ * Ignite API entry point.
+ *
+ * @return Ignite instance.
+ */
+ Ignite ignite();
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 03ae7c2..847a297 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -37,6 +37,7 @@
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
@@ -112,11 +113,7 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<R>submit(nodes, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
- }
+ return sync(executeAsync(nodes, units, jobClassName, options, args));
}
/** {@inheritDoc} */
@@ -208,11 +205,7 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<R>submitColocated(tableName, key, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
- }
+ return sync(executeColocatedAsync(tableName, key, units, jobClassName, options, args));
}
/** {@inheritDoc} */
@@ -226,11 +219,7 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<K, R>submitColocated(tableName, key, keyMapper, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
- }
+ return sync(executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args));
}
/** {@inheritDoc} */
@@ -261,6 +250,17 @@
return map;
}
+ @Override
+ public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22124
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ return sync(executeMapReduceAsync(units, taskClassName, args));
+ }
+
private CompletableFuture<SubmitResult> executeOnNodesAsync(
Set<ClusterNode> nodes,
List<DeploymentUnit> units,
@@ -441,4 +441,12 @@
private static SubmitResult unpackSubmitResult(PayloadInputChannel ch) {
return new SubmitResult(ch.in().unpackUuid(), ch.notificationFuture());
}
+
+ private static <R> R sync(CompletableFuture<R> future) {
+ try {
+ return future.join();
+ } catch (CompletionException e) {
+ throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
+ }
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
new file mode 100644
index 0000000..dd1b766
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute.task;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.jetbrains.annotations.Nullable;
+
+// TODO https://issues.apache.org/jira/browse/IGNITE-22124
+/**
+ * Client compute task implementation.
+ *
+ * @param <R> Task result type.
+ */
+public class ClientTaskExecution<R> implements TaskExecution<R> {
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public CompletableFuture<@Nullable JobStatus> statusAsync() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 94d7894..3d5c9d7 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -41,6 +41,7 @@
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -121,11 +122,7 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<R>submit(nodes, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.wrap(e);
- }
+ return sync(executeAsync(nodes, units, jobClassName, options, args));
}
@Override
@@ -163,11 +160,7 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<R>submitColocated(tableName, key, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.wrap(e);
- }
+ return sync(executeColocatedAsync(tableName, key, units, jobClassName, options, args));
}
/** {@inheritDoc} */
@@ -181,11 +174,7 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<K, R>submitColocated(tableName, key, keyMapper, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.wrap(e);
- }
+ return sync(executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args));
}
@Override
@@ -199,6 +188,16 @@
return null;
}
+ @Override
+ public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ return null;
+ }
+
+ @Override
+ public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ return sync(executeMapReduceAsync(units, taskClassName, args));
+ }
+
private <R> JobExecution<R> completedExecution(R result) {
return jobExecution(completedFuture(result));
}
@@ -261,4 +260,12 @@
public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID jobId, int newPriority) {
return trueCompletedFuture();
}
+
+ private static <R> R sync(CompletableFuture<R> future) {
+ try {
+ return future.join();
+ } catch (CompletionException e) {
+ throw ExceptionUtils.wrap(e);
+ }
+ }
}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 7006989..553267f 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -27,6 +27,7 @@
import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
@@ -46,6 +47,7 @@
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -64,6 +66,7 @@
* corresponding job class to the jobs source set. The integration tests depend on this source set so the job class will be visible and it
* will be automatically compiled and packed into the ignite-integration-test-jobs-1.0-SNAPSHOT.jar.
*/
+@SuppressWarnings("resource")
public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest {
protected abstract List<DeploymentUnit> units();
@@ -379,6 +382,43 @@
assertThat(execution.cancelAsync(), willBe(false));
}
+ @Test
+ void submitMapReduce() {
+ IgniteImpl entryNode = node(0);
+
+ TaskExecution<Integer> taskExecution = entryNode.compute().submitMapReduce(units(), mapReduceTaskClassName(), units());
+
+ int sumOfNodeNamesLengths = CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
+ assertThat(taskExecution.resultAsync(), willBe(sumOfNodeNamesLengths));
+
+ // Statuses list contains statuses for 3 running nodes
+ assertThat(taskExecution.statusesAsync(), willBe(contains(
+ jobStatusWithState(COMPLETED),
+ jobStatusWithState(COMPLETED),
+ jobStatusWithState(COMPLETED)
+ )));
+ }
+
+ @Test
+ void executeMapReduceAsync() {
+ IgniteImpl entryNode = node(0);
+
+ CompletableFuture<Integer> future = entryNode.compute().executeMapReduceAsync(units(), mapReduceTaskClassName(), units());
+
+ int sumOfNodeNamesLengths = CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
+ assertThat(future, willBe(sumOfNodeNamesLengths));
+ }
+
+ @Test
+ void executeMapReduce() {
+ IgniteImpl entryNode = node(0);
+
+ int result = entryNode.compute().executeMapReduce(units(), mapReduceTaskClassName(), units());
+
+ int sumOfNodeNamesLengths = CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
+ assertThat(result, is(sumOfNodeNamesLengths));
+ }
+
static IgniteImpl node(int i) {
return CLUSTER.node(i);
}
@@ -395,6 +435,10 @@
return FailingJob.class.getName();
}
+ private static String mapReduceTaskClassName() {
+ return MapReduce.class.getName();
+ }
+
static void assertComputeException(Exception ex, Throwable cause) {
assertComputeException(ex, cause.getClass().getName(), cause.getMessage());
}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index b764e5b..77fb323 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -326,9 +326,7 @@
return IntStream.range(0, initialNodes()).mapToObj(Arguments::of);
}
-
private static class CustomFailingJob implements ComputeJob<String> {
- /** {@inheritDoc} */
@Override
public String execute(JobExecutionContext context, Object... args) {
throw ExceptionUtils.sneakyThrow((Throwable) args[0]);
@@ -336,8 +334,6 @@
}
private static class WaitLatchJob implements ComputeJob<String> {
-
- /** {@inheritDoc} */
@Override
public String execute(JobExecutionContext context, Object... args) {
try {
@@ -350,10 +346,8 @@
}
private static class WaitLatchThrowExceptionOnFirstExecutionJob implements ComputeJob<String> {
-
static final AtomicInteger counter = new AtomicInteger(0);
- /** {@inheritDoc} */
@Override
public String execute(JobExecutionContext context, Object... args) {
try {
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
new file mode 100644
index 0000000..344d11c
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
+import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTimeFinishTime;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.time.Instant;
+import java.util.List;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.InteractiveTasks;
+import org.apache.ignite.internal.compute.utils.TestingJobExecution;
+import org.apache.ignite.lang.IgniteException;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("resource")
+class ItMapReduceTest extends ClusterPerClassIntegrationTest {
+ @BeforeEach
+ void initChannels() {
+ InteractiveJobs.clearState();
+ InteractiveTasks.clearState();
+
+ List<String> allNodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).collect(toList());
+ InteractiveJobs.initChannels(allNodeNames);
+ }
+
+ @Test
+ void taskMaintainsStatus() throws Exception {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ // Given running task.
+ TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+ TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
+ testExecution.assertExecuting();
+ InteractiveTasks.GlobalApi.assertAlive();
+
+ // Save status before split.
+ JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+ // And statuses list future is not complete yet.
+ assertThat(taskExecution.statusesAsync().isDone(), is(false));
+
+ // When finish the split job.
+ InteractiveTasks.GlobalApi.finishSplit();
+
+ // Then the task is still executing while waiting for the jobs to finish.
+ testExecution.assertExecuting();
+ assertTaskStatusIs(taskExecution, EXECUTING, statusBeforeSplit, nullValue(Instant.class));
+
+ // And statuses list contains statuses for 3 running nodes.
+ assertJobStates(taskExecution, EXECUTING);
+
+ // When finish the jobs.
+ InteractiveJobs.all().finishReturnWorkerNames();
+
+ // Then the task is still executing while waiting for the reduce to finish.
+ testExecution.assertExecuting();
+ assertTaskStatusIs(taskExecution, EXECUTING, statusBeforeSplit, nullValue(Instant.class));
+
+ // When finish the reduce job.
+ InteractiveTasks.GlobalApi.finishReduce();
+
+ // Then the task is complete and the result is the list of all node names.
+ String[] allNodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).toArray(String[]::new);
+ assertThat(taskExecution.resultAsync(), willBe(containsInAnyOrder(allNodeNames)));
+
+ // And task status is completed.
+ assertTaskStatusIs(taskExecution, COMPLETED, statusBeforeSplit, notNullValue(Instant.class));
+
+ // And statuses list contains statuses for 3 completed jobs.
+ assertJobStates(taskExecution, COMPLETED);
+ }
+
+ @Test
+ void splitThrowsException() throws Exception {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ // Given running task.
+ TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+ // Save status before split.
+ JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+ // When the split job throws an exception.
+ InteractiveTasks.GlobalApi.throwException();
+
+ // Then the task fails.
+ assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+
+ // And statuses list fails.
+ assertThat(taskExecution.statusesAsync(), willThrow(IgniteException.class));
+ }
+
+ @Test
+ void cancelSplit() throws Exception {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ // Given running task.
+ TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+ // Save status before split.
+ JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+ // When cancel the task.
+ assertThat(taskExecution.cancelAsync(), willBe(true));
+
+ // Then the task is cancelled.
+ assertTaskFailed(taskExecution, CANCELED, statusBeforeSplit);
+
+ // And statuses list will fail.
+ assertThat(taskExecution.statusesAsync(), willThrow(RuntimeException.class));
+ }
+
+ @Test
+ void jobThrowsException() throws Exception {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ // Given running task.
+ TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+ // Save status before split.
+ JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+ // And finish the split job.
+ finishSplit(taskExecution);
+
+ // When jobs throw an exception.
+ InteractiveJobs.all().throwException();
+
+ // Then the task fails.
+ assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+ }
+
+ @Test
+ void cancelJobs() throws Exception {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ // Given running task.
+ TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+ TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
+ testExecution.assertExecuting();
+ InteractiveTasks.GlobalApi.assertAlive();
+
+ // Save status before split.
+ JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+ // And finish the split job.
+ finishSplit(taskExecution);
+
+ // When cancel the task.
+ assertThat(taskExecution.cancelAsync(), willBe(true));
+
+ // Then the task is cancelled.
+ assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+
+ // And statuses list contains canceled statuses.
+ assertJobStates(taskExecution, CANCELED);
+ }
+
+ @Test
+ void reduceThrowsException() throws Exception {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ // Given running task.
+ TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+ // Save status before split.
+ JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+ // And finish the split job.
+ finishSplit(taskExecution);
+
+ // And finish jobs.
+ InteractiveJobs.all().finishReturnWorkerNames();
+
+ // And reduce throws an exception.
+ InteractiveTasks.GlobalApi.throwException();
+
+ // Then the task fails.
+ assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+
+ // And statuses list contains completed statuses.
+ assertJobStates(taskExecution, COMPLETED);
+ }
+
+ @Test
+ void cancelReduce() throws Exception {
+ IgniteImpl entryNode = CLUSTER.node(0);
+
+ // Given running task.
+ TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+ TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
+ testExecution.assertExecuting();
+ InteractiveTasks.GlobalApi.assertAlive();
+
+ // Save status before split.
+ JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+ // And finish the split job.
+ finishSplit(taskExecution);
+
+ // And finish jobs.
+ InteractiveJobs.all().finishReturnWorkerNames();
+
+ // Wait for the reduce job to start.
+ InteractiveTasks.GlobalApi.assertAlive();
+
+ // When cancel the task.
+ assertThat(taskExecution.cancelAsync(), willBe(true));
+
+ // Then the task is cancelled.
+ assertTaskFailed(taskExecution, CANCELED, statusBeforeSplit);
+
+ // And statuses list contains completed statuses.
+ assertJobStates(taskExecution, COMPLETED);
+ }
+
+ private static TaskExecution<List<String>> startTask(IgniteImpl entryNode) throws InterruptedException {
+ TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+ new TestingJobExecution<>(taskExecution).assertExecuting();
+ InteractiveTasks.GlobalApi.assertAlive();
+ return taskExecution;
+ }
+
+ private static void finishSplit(TaskExecution<List<String>> taskExecution) {
+ // Finish the split job.
+ InteractiveTasks.GlobalApi.finishSplit();
+
+ // And wait for statuses list contains statuses for 3 running nodes.
+ assertJobStates(taskExecution, EXECUTING);
+ }
+
+ private static void assertTaskFailed(TaskExecution<List<String>> taskExecution, JobState jobState, JobStatus statusBeforeSplit) {
+ assertThat(taskExecution.resultAsync(), willThrow(IgniteException.class));
+ assertTaskStatusIs(taskExecution, jobState, statusBeforeSplit, notNullValue(Instant.class));
+ }
+
+ private static void assertTaskStatusIs(
+ TaskExecution<List<String>> taskExecution,
+ JobState jobState,
+ JobStatus statusBeforeSplit,
+ Matcher<Instant> finishTimeMatcher
+ ) {
+ assertThat(taskExecution.statusAsync(), willBe(jobStatusWithStateAndCreateTimeStartTimeFinishTime(
+ is(jobState),
+ is(statusBeforeSplit.createTime()),
+ is(statusBeforeSplit.startTime()),
+ is(finishTimeMatcher)
+ )));
+ assertThat(taskExecution.idAsync(), willBe(statusBeforeSplit.id()));
+ }
+
+ private static void assertJobStates(TaskExecution<List<String>> taskExecution, JobState state) {
+ await().until(taskExecution::statusesAsync, willBe(contains(
+ jobStatusWithState(state),
+ jobStatusWithState(state),
+ jobStatusWithState(state)
+ )));
+ }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index 4636348..1d53015 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -245,8 +245,10 @@
break;
case RETURN:
return "Done";
+ case RETURN_WORKER_NAME:
+ return workerNodeName;
case GET_WORKER_NAME:
- NODE_CHANNELS.get(workerNodeName).add(context.ignite().name());
+ NODE_CHANNELS.get(workerNodeName).add(workerNodeName);
break;
default:
throw new IllegalStateException("Unexpected value: " + receivedSignal);
@@ -321,15 +323,12 @@
INTERACTIVE_JOB_RUN_TIMES.forEach((nodeName, runTimes) -> assertThat(runTimes, equalTo(1)));
}
- /**
- * Finishes all {@link InteractiveJob}s.
- */
- public void finish() {
+ private static void sendTerminalSignal(Signal signal) {
NODE_SIGNALS.forEach((nodeName, channel) -> {
try {
- channel.offer(Signal.RETURN, WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ channel.offer(signal, WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- throw new RuntimeException("Can not send finish signal to he node", e);
+ throw new RuntimeException("Can not send signal to the node", e);
}
});
@@ -341,6 +340,27 @@
);
});
}
+
+ /**
+ * Finishes all {@link InteractiveJob}s.
+ */
+ public void finish() {
+ sendTerminalSignal(Signal.RETURN);
+ }
+
+ /**
+ * Finishes all {@link InteractiveJob}s by returning worker node names.
+ */
+ public void finishReturnWorkerNames() {
+ sendTerminalSignal(Signal.RETURN_WORKER_NAME);
+ }
+
+ /**
+ * Finishes all {@link InteractiveJob}s by returning worker node names.
+ */
+ public void throwException() {
+ sendTerminalSignal(Signal.THROW);
+ }
}
/**
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
new file mode 100644
index 0000000..c19e7c6
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.utils;
+
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+
+/**
+ * Tests DSL for interactive tasks. "Interactive" means that you can send messages and get responses to/from running tasks.
+ *
+ * <p>For example, you can start {@link GlobalInteractiveMapReduceTask} on some node, get the name of worker node for split or reduce job,
+ * ask split or reduce job to complete successfully or throw exception. Also, this class gives useful assertions for task states.
+ */
+public final class InteractiveTasks {
+ /**
+ * ACK for {@link Signal#CONTINUE}. Returned by a task that has received the signal. Used to check that the task is alive.
+ */
+ private static final Object ACK = new Object();
+
+ /**
+ * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a
+ * signal to the task via this channel and get a response from it via {@link #GLOBAL_CHANNEL}.
+ */
+ private static final BlockingQueue<Signal> GLOBAL_SIGNALS = new LinkedBlockingQueue<>();
+
+ /**
+ * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a
+ * signal to the task via {@link #GLOBAL_SIGNALS} and get a response from it via this channel.
+ */
+ private static final BlockingQueue<Object> GLOBAL_CHANNEL = new LinkedBlockingQueue<>();
+
+ /**
+ * This counter indicated how many {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} methods are running
+ * now. This counter increased each time the {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} is called and
+ * decreased when the method is finished (whatever the result is). Checked in {@link #clearState}.
+ */
+ private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new AtomicInteger(0);
+
+ /**
+ * This counter indicated how many {@link GlobalInteractiveMapReduceTask#reduce(Map)} methods are running now. This counter increased
+ * each time the {@link GlobalInteractiveMapReduceTask#reduce(Map)} is called and decreased when the method is finished (whatever the
+ * result is). Checked in {@link #clearState}.
+ */
+ private static final AtomicInteger RUNNING_GLOBAL_REDUCE_CNT = new AtomicInteger(0);
+
+ /**
+ * The timeout in seconds that defines how long should we wait for async calls. Almost all methods use this timeout.
+ */
+ private static final long WAIT_TIMEOUT_SECONDS = 15;
+
+ /**
+ * Clear global state. Must be called before each testing scenario.
+ */
+ public static void clearState() {
+ assertThat(
+ "Global split job is running. Can not clear global state. Please, stop the job first.",
+ RUNNING_GLOBAL_SPLIT_CNT.get(),
+ is(0)
+ );
+ assertThat(
+ "Global reduce job is running. Can not clear global state. Please, stop the job first.",
+ RUNNING_GLOBAL_REDUCE_CNT.get(),
+ is(0)
+ );
+
+ GLOBAL_SIGNALS.clear();
+ GLOBAL_CHANNEL.clear();
+ }
+
+ /**
+ * Signals that are sent by test code to the tasks.
+ */
+ private enum Signal {
+ /**
+ * Signal to the task to continue running and send ACK as a response.
+ */
+ CONTINUE,
+
+ /**
+ * Ask task to throw an exception.
+ */
+ THROW,
+
+ /**
+ * Ask split method to return a list of jobs which will be executed on all nodes.
+ */
+ SPLIT_RETURN_ALL_NODES,
+
+ /**
+ * Ask reduce method to return a concatenation of jobs results.
+ */
+ REDUCE_RETURN
+ }
+
+ private static Signal listenSignal() {
+ try {
+ return GLOBAL_SIGNALS.take();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * If any of the args are strings, convert them to signals and offer them to the job.
+ *
+ * @param args Job args.
+ */
+ private static void offerArgsAsSignals(Object... args) {
+ for (Object arg : args) {
+ if (arg instanceof String) {
+ String signal = (String) arg;
+ try {
+ GLOBAL_SIGNALS.offer(Signal.valueOf(signal));
+ } catch (IllegalArgumentException ignored) {
+ // Ignore non-signal strings
+ }
+ }
+ }
+ }
+
+ /**
+ * Interactive map reduce task that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}.
+ */
+ private static class GlobalInteractiveMapReduceTask implements MapReduceTask<List<String>> {
+ @Override
+ public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) {
+ RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();
+
+ offerArgsAsSignals(args);
+
+ try {
+ while (true) {
+ Signal receivedSignal = listenSignal();
+ switch (receivedSignal) {
+ case THROW:
+ throw new RuntimeException();
+ case CONTINUE:
+ GLOBAL_CHANNEL.offer(ACK);
+ break;
+ case SPLIT_RETURN_ALL_NODES:
+ return context.ignite().clusterNodes().stream().map(node ->
+ ComputeJobRunner.builder()
+ .jobClassName(InteractiveJobs.interactiveJobName())
+ .nodes(Set.of(node))
+ .build()
+ ).collect(toList());
+ default:
+ throw new IllegalStateException("Unexpected value: " + receivedSignal);
+ }
+ }
+ } finally {
+ RUNNING_GLOBAL_SPLIT_CNT.decrementAndGet();
+ }
+ }
+
+ @Override
+ public List<String> reduce(Map<UUID, ?> results) {
+ RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet();
+ try {
+ while (true) {
+ Signal receivedSignal = listenSignal();
+ switch (receivedSignal) {
+ case THROW:
+ throw new RuntimeException();
+ case CONTINUE:
+ GLOBAL_CHANNEL.offer(ACK);
+ break;
+ case REDUCE_RETURN:
+ return results.values().stream()
+ .map(String.class::cast)
+ .collect(toList());
+ default:
+ throw new IllegalStateException("Unexpected value: " + receivedSignal);
+ }
+ }
+ } finally {
+ RUNNING_GLOBAL_REDUCE_CNT.decrementAndGet();
+ }
+ }
+ }
+
+ /**
+ * API for the interaction with {@link GlobalInteractiveMapReduceTask}.
+ */
+ public static final class GlobalApi {
+ /**
+ * Checks that {@link GlobalInteractiveMapReduceTask} is alive.
+ */
+ public static void assertAlive() throws InterruptedException {
+ GLOBAL_SIGNALS.offer(Signal.CONTINUE);
+ assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), equalTo(ACK));
+ }
+
+ /**
+ * Finishes the split job, returning job runners for all nodes.
+ */
+ public static void finishSplit() {
+ GLOBAL_SIGNALS.offer(Signal.SPLIT_RETURN_ALL_NODES);
+ }
+
+ /**
+ * Finishes the split job, returning job runners for all nodes.
+ */
+ public static void throwException() {
+ GLOBAL_SIGNALS.offer(Signal.THROW);
+ }
+
+ /**
+ * Finishes the reduce job, returning final result.
+ */
+ public static void finishReduce() {
+ GLOBAL_SIGNALS.offer(Signal.REDUCE_RETURN);
+ }
+
+ public static String name() {
+ return GlobalInteractiveMapReduceTask.class.getName();
+ }
+ }
+}
diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
new file mode 100644
index 0000000..b5ffba5
--- /dev/null
+++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+
+/** Map reduce task which runs a {@link GetNodeNameJob} on each node and computes a sum of length of all node names. */
+public class MapReduce implements MapReduceTask<Integer> {
+ @Override
+ public List<ComputeJobRunner> split(TaskExecutionContext taskContext, Object... args) {
+ List<DeploymentUnit> deploymentUnits = (List<DeploymentUnit>) args[0];
+
+ return taskContext.ignite().clusterNodes().stream().map(node ->
+ ComputeJobRunner.builder()
+ .jobClassName(GetNodeNameJob.class.getName())
+ .units(deploymentUnits)
+ .nodes(Set.of(node))
+ .options(JobExecutionOptions.builder()
+ .maxRetries(10)
+ .priority(Integer.MAX_VALUE)
+ .build()
+ ).build()
+ ).collect(toList());
+ }
+
+ @Override
+ public Integer reduce(Map<UUID, ?> results) {
+ return results.values().stream()
+ .map(String.class::cast)
+ .map(String::length)
+ .reduce(Integer::sum)
+ .orElseThrow();
+ }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
index 13c209b..7d46fbe 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
@@ -28,6 +28,8 @@
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution;
import org.apache.ignite.internal.wrapper.Wrapper;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
@@ -134,6 +136,16 @@
.collect(toMap(Entry::getKey, entry -> preventThreadHijack(entry.getValue())));
}
+ @Override
+ public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ return new AntiHijackTaskExecution<>(compute.submitMapReduce(units, taskClassName, args), asyncContinuationExecutor);
+ }
+
+ @Override
+ public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ return compute.executeMapReduce(units, taskClassName, args);
+ }
+
private <R> JobExecution<R> preventThreadHijack(JobExecution<R> execution) {
return new AntiHijackJobExecution<>(execution, asyncContinuationExecutor);
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
index 5a77f6e..c7c0f99 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
@@ -59,7 +59,7 @@
return preventThreadHijack(execution.changePriorityAsync(newPriority));
}
- private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) {
+ protected <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) {
return PublicApiThreading.preventThreadHijack(originalFuture, asyncContinuationExecutor);
}
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 3f88353..e05883d 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -24,6 +24,8 @@
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -40,7 +42,7 @@
* @param jobClassName Name of the job class.
* @param args Job args.
* @param <R> Job result type.
- * @return Future execution result.
+ * @return Job execution object.
*/
<R> JobExecution<R> executeLocally(
ExecutionOptions options,
@@ -56,7 +58,7 @@
* @param jobClassName Name of the job class.
* @param args Job args.
* @param <R> Job result type.
- * @return Future execution result.
+ * @return Job execution object.
*/
default <R> JobExecution<R> executeLocally(
List<DeploymentUnit> units,
@@ -70,12 +72,12 @@
* Executes a job of the given class on a remote node.
*
* @param options Job execution options.
- * @param remoteNode Name of the job class.
+ * @param remoteNode Remote node name.
* @param units Deployment units which will be loaded for execution.
* @param jobClassName Name of the job class.
* @param args Job args.
* @param <R> Job result type.
- * @return Future execution result.
+ * @return Job execution object.
*/
<R> JobExecution<R> executeRemotely(
ExecutionOptions options,
@@ -88,12 +90,12 @@
/**
* Executes a job of the given class on a remote node with default execution options {@link ExecutionOptions#DEFAULT}.
*
- * @param remoteNode Name of the job class.
+ * @param remoteNode Remote node name.
* @param units Deployment units which will be loaded for execution.
* @param jobClassName Name of the job class.
* @param args Job args.
* @param <R> Job result type.
- * @return Future execution result.
+ * @return Job execution object.
*/
default <R> JobExecution<R> executeRemotely(
ClusterNode remoteNode,
@@ -108,14 +110,14 @@
* Executes a job of the given class on a remote node. If the node leaves the cluster, it will be restarted on the node given by the
* {@code nextWorkerSelector}.
*
- * @param remoteNode Name of the job class.
+ * @param remoteNode Remote node name.
* @param nextWorkerSelector The selector that returns the next worker to execute job on.
* @param options Job execution options.
* @param units Deployment units which will be loaded for execution.
* @param jobClassName Name of the job class.
* @param args Job args.
* @param <R> Job result type.
- * @return Future execution result.
+ * @return Job execution object.
*/
<R> JobExecution<R> executeRemotelyWithFailover(
ClusterNode remoteNode,
@@ -127,6 +129,23 @@
);
/**
+ * Executes a task of the given class.
+ *
+ * @param jobSubmitter Function which submits a job with specified parameters for the execution.
+ * @param units Deployment units which will be loaded for execution.
+ * @param taskClassName Name of the task class.
+ * @param args Task args.
+ * @param <R> Task result type.
+ * @return Task execution object.
+ */
+ <R> TaskExecution<R> executeTask(
+ JobSubmitter jobSubmitter,
+ List<DeploymentUnit> units,
+ String taskClassName,
+ Object... args
+ );
+
+ /**
* Retrieves the current status of all jobs on all nodes in the cluster.
*
* @return The collection of job statuses.
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 677b492..78fbaf6 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -20,6 +20,8 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.mapClassLoaderExceptions;
+import static org.apache.ignite.internal.compute.ComputeUtils.jobClass;
+import static org.apache.ignite.internal.compute.ComputeUtils.taskClass;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
@@ -34,6 +36,7 @@
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.executor.ComputeExecutor;
@@ -42,6 +45,9 @@
import org.apache.ignite.internal.compute.loader.JobContextManager;
import org.apache.ignite.internal.compute.messaging.ComputeMessaging;
import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
+import org.apache.ignite.internal.compute.task.DelegatingTaskExecution;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -125,7 +131,7 @@
CompletableFuture<JobExecutionInternal<R>> future =
mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units), jobClassName)
.thenApply(context -> {
- JobExecutionInternal<R> execution = exec(context, options, jobClassName, args);
+ JobExecutionInternal<R> execution = execJob(context, options, jobClassName, args);
execution.resultAsync().whenComplete((result, e) -> context.close());
inFlightFutures.registerFuture(execution.resultAsync());
return execution;
@@ -141,6 +147,39 @@
}
}
+ @Override
+ public <R> TaskExecution<R> executeTask(
+ JobSubmitter jobSubmitter,
+ List<DeploymentUnit> units,
+ String taskClassName,
+ Object... args
+ ) {
+ if (!busyLock.enterBusy()) {
+ return new DelegatingTaskExecution<>(
+ failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()))
+ );
+ }
+
+ try {
+ CompletableFuture<TaskExecutionInternal<R>> taskFuture =
+ mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units), taskClassName)
+ .thenApply(context -> {
+ TaskExecutionInternal<R> execution = execTask(context, jobSubmitter, taskClassName, args);
+ execution.resultAsync().whenComplete((r, e) -> context.close());
+ inFlightFutures.registerFuture(execution.resultAsync());
+ return execution;
+ });
+
+ inFlightFutures.registerFuture(taskFuture);
+
+ DelegatingTaskExecution<R> result = new DelegatingTaskExecution<>(taskFuture);
+ result.idAsync().thenAccept(jobId -> executionManager.addExecution(jobId, result));
+ return result;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/** {@inheritDoc} */
@Override
public <R> JobExecution<R> executeRemotely(
@@ -253,10 +292,24 @@
return nullCompletedFuture();
}
- private <R> JobExecutionInternal<R> exec(JobContext context, ExecutionOptions options, String jobClassName, Object[] args) {
+ private <R> JobExecutionInternal<R> execJob(JobContext context, ExecutionOptions options, String jobClassName, Object... args) {
try {
- return executor.executeJob(options, ComputeUtils.jobClass(context.classLoader(), jobClassName), args);
- } catch (RuntimeException e) {
+ return executor.executeJob(options, jobClass(context.classLoader(), jobClassName), args);
+ } catch (Throwable e) {
+ context.close();
+ throw e;
+ }
+ }
+
+ private <R> TaskExecutionInternal<R> execTask(
+ JobContext context,
+ JobSubmitter jobSubmitter,
+ String taskClassName,
+ Object... args
+ ) {
+ try {
+ return executor.executeTask(jobSubmitter, taskClass(context.classLoader(), taskClassName), args);
+ } catch (Throwable e) {
context.close();
throw e;
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 28e48a6..b778308 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -34,6 +34,7 @@
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.DeploymentUnit;
import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.version.Version;
import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
@@ -53,7 +54,7 @@
private static final ComputeMessagesFactory MESSAGES_FACTORY = new ComputeMessagesFactory();
/**
- * Instantiate compute job via provided class loader by provided job class name.
+ * Instantiate compute job via provided class loader by provided job class.
*
* @param computeJobClass Compute job class.
* @param <R> Compute job return type.
@@ -76,11 +77,7 @@
return constructor.newInstance();
} catch (ReflectiveOperationException e) {
- throw new ComputeException(
- CLASS_INITIALIZATION_ERR,
- "Cannot instantiate job",
- e
- );
+ throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot instantiate job", e);
}
}
@@ -96,12 +93,52 @@
try {
return (Class<ComputeJob<R>>) Class.forName(jobClassName, true, jobClassLoader);
} catch (ClassNotFoundException e) {
+ throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot load job class by name '" + jobClassName + "'", e);
+ }
+ }
+
+ /**
+ * Instantiate map reduce task via provided class loader by provided task class.
+ *
+ * @param taskClass Map reduce task class.
+ * @param <R> Map reduce task return type.
+ * @return Map reduce task instance.
+ */
+ public static <R> MapReduceTask<R> instantiateTask(Class<? extends MapReduceTask<R>> taskClass) {
+ if (!(MapReduceTask.class.isAssignableFrom(taskClass))) {
throw new ComputeException(
CLASS_INITIALIZATION_ERR,
- "Cannot load job class by name '" + jobClassName + "'",
- e
+ "'" + taskClass.getName() + "' does not implement ComputeTask interface"
);
}
+
+ try {
+ Constructor<? extends MapReduceTask<R>> constructor = taskClass.getDeclaredConstructor();
+
+ if (!constructor.canAccess(null)) {
+ constructor.setAccessible(true);
+ }
+
+ return constructor.newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot instantiate task", e);
+ }
+ }
+
+ /**
+ * Resolve map reduce task class name to map reduce task class reference.
+ *
+ * @param taskClassLoader Class loader.
+ * @param taskClassName Map reduce task class name.
+ * @param <R> Map reduce task return type.
+ * @return Map reduce task class.
+ */
+ public static <R> Class<MapReduceTask<R>> taskClass(ClassLoader taskClassLoader, String taskClassName) {
+ try {
+ return (Class<MapReduceTask<R>>) Class.forName(taskClassName, true, taskClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot load task class by name '" + taskClassName + "'", e);
+ }
}
/**
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
index 54191a2..b4f5d15 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
@@ -18,11 +18,11 @@
package org.apache.ignite.internal.compute;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.toSet;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Compute.RESULT_NOT_FOUND_ERR;
-import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -34,6 +34,7 @@
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -101,16 +102,16 @@
*
* @return The set of all job statuses.
*/
- public CompletableFuture<Set<JobStatus>> localStatusesAsync() {
- CompletableFuture<JobStatus>[] statuses = executions.values().stream()
+ public CompletableFuture<List<JobStatus>> localStatusesAsync() {
+ CompletableFuture<JobStatus>[] statusFutures = executions.values().stream()
.filter(it -> !(it instanceof RemoteJobExecution) && !(it instanceof FailSafeJobExecution))
.map(JobExecution::statusAsync)
.toArray(CompletableFuture[]::new);
- return CompletableFuture.allOf(statuses)
- .thenApply(ignored -> Arrays.stream(statuses).map(CompletableFuture::join)
+ return CompletableFutures.allOf(statusFutures)
+ .thenApply(statuses -> statuses.stream()
.filter(Objects::nonNull)
- .collect(toSet()));
+ .collect(toList()));
}
/**
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index acbf87d..0bd0d64 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -46,6 +46,8 @@
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.NodeNotFoundException;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.ComputeJobRunner;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -92,7 +94,6 @@
this.clock = clock;
}
- /** {@inheritDoc} */
@Override
public <R> JobExecution<R> submit(
Set<ClusterNode> nodes,
@@ -148,7 +149,6 @@
));
}
- /** {@inheritDoc} */
@Override
public <R> R execute(
Set<ClusterNode> nodes,
@@ -157,11 +157,7 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<R>submit(nodes, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
- }
+ return sync(executeAsync(nodes, units, jobClassName, options, args));
}
private static ClusterNode randomNode(Set<ClusterNode> nodes) {
@@ -212,7 +208,6 @@
return targetNode.equals(topologyService.localMember());
}
- /** {@inheritDoc} */
@Override
public <R> JobExecution<R> submitColocated(
String tableName,
@@ -234,7 +229,6 @@
);
}
- /** {@inheritDoc} */
@Override
public <K, R> JobExecution<R> submitColocated(
String tableName,
@@ -263,7 +257,6 @@
);
}
- /** {@inheritDoc} */
@Override
public <R> R executeColocated(
String tableName,
@@ -273,14 +266,9 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<R>submitColocated(tableName, key, units, jobClassName, options, args).resultAsync().join();
- } catch (CompletionException e) {
- throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
- }
+ return sync(executeColocatedAsync(tableName, key, units, jobClassName, options, args));
}
- /** {@inheritDoc} */
@Override
public <K, R> R executeColocated(
String tableName,
@@ -291,15 +279,9 @@
JobExecutionOptions options,
Object... args
) {
- try {
- return this.<K, R>submitColocated(tableName, key, keyMapper, units, jobClassName, options, args).resultAsync()
- .join();
- } catch (CompletionException e) {
- throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
- }
+ return sync(executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args));
}
- /** {@inheritDoc} */
@Override
public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(
TableViewInternal table,
@@ -353,7 +335,6 @@
});
}
- /** {@inheritDoc} */
@Override
public <R> Map<ClusterNode, JobExecution<R>> submitBroadcast(
Set<ClusterNode> nodes,
@@ -381,6 +362,23 @@
}
@Override
+ public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ Objects.requireNonNull(units);
+ Objects.requireNonNull(taskClassName);
+
+ return new TaskExecutionWrapper<>(computeComponent.executeTask(this::submitJob, units, taskClassName, args));
+ }
+
+ @Override
+ public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+ return sync(executeMapReduceAsync(units, taskClassName, args));
+ }
+
+ private JobExecution<Object> submitJob(ComputeJobRunner runner) {
+ return submit(runner.nodes(), runner.units(), runner.jobClassName(), runner.options(), runner.args());
+ }
+
+ @Override
public CompletableFuture<Collection<JobStatus>> statusesAsync() {
return computeComponent.statusesAsync();
}
@@ -404,4 +402,12 @@
ComputeComponent computeComponent() {
return computeComponent;
}
+
+ private static <R> R sync(CompletableFuture<R> future) {
+ try {
+ return future.join();
+ } catch (CompletionException e) {
+ throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
+ }
+ }
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
new file mode 100644
index 0000000..56e2803
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wraps the {@link TaskExecution} converting exceptions thrown by the delegate to public.
+ *
+ * @param <R> Result type.
+ */
+class TaskExecutionWrapper<R> extends JobExecutionWrapper<R> implements TaskExecution<R> {
+ private final TaskExecution<R> delegate;
+
+ TaskExecutionWrapper(TaskExecution<R> delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+ return convertToPublicFuture(delegate.statusesAsync());
+ }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
index bf4866a..f103086 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
@@ -18,13 +18,18 @@
package org.apache.ignite.internal.compute.executor;
import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
/**
* Executor of Compute jobs.
*/
public interface ComputeExecutor {
- <R> JobExecutionInternal<R> executeJob(ExecutionOptions options, Class<? extends ComputeJob<R>> jobClass, Object[] args);
+ <R> JobExecutionInternal<R> executeJob(ExecutionOptions options, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+ <R> TaskExecutionInternal<R> executeTask(JobSubmitter jobSubmitter, Class<? extends MapReduceTask<R>> taskClass, Object... args);
void start();
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index af39e61..3d956cf 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -24,6 +24,7 @@
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.internal.compute.ComputeUtils;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
@@ -31,6 +32,8 @@
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.compute.state.ComputeStateMachine;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
@@ -87,6 +90,17 @@
}
@Override
+ public <R> TaskExecutionInternal<R> executeTask(
+ JobSubmitter jobSubmitter,
+ Class<? extends MapReduceTask<R>> taskClass,
+ Object... args
+ ) {
+ assert executorService != null;
+
+ return new TaskExecutionInternal<>(executorService, jobSubmitter, taskClass, () -> ignite, args);
+ }
+
+ @Override
public void start() {
stateMachine.start();
executorService = new PriorityQueueExecutor(
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index b6bcd0e..1b39f2b 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -483,25 +483,17 @@
return result;
}
- private <R> CompletableFuture<Collection<R>> broadcastAsyncAndCollect(
+ private <R> CompletableFuture<List<R>> broadcastAsyncAndCollect(
Function<ClusterNode, CompletableFuture<@Nullable R>> request,
- Function<Throwable, Throwable> error
+ Function<Throwable, RuntimeException> error
) {
- CompletableFuture<Collection<R>> result = new CompletableFuture<>();
-
CompletableFuture<R>[] futures = topologyService.allMembers()
.stream()
.map(request::apply)
.toArray(CompletableFuture[]::new);
- CompletableFutures.allOf(futures).whenComplete((collection, throwable) -> {
- if (throwable == null) {
- result.complete(collection);
- } else {
- result.completeExceptionally(error.apply(throwable));
- }
+ return CompletableFutures.allOf(futures).exceptionally(throwable -> {
+ throw error.apply(throwable);
});
-
- return result;
}
}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
new file mode 100644
index 0000000..5adf264
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.compute.AntiHijackJobExecution;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link TaskExecution} that adds protection against thread hijacking by users.
+ */
+public class AntiHijackTaskExecution<R> extends AntiHijackJobExecution<R> implements TaskExecution<R> {
+ private final TaskExecution<R> execution;
+
+ /**
+ * Constructor.
+ *
+ * @param execution Original execution.
+ * @param asyncContinuationExecutor Executor to which the execution will be resubmitted.
+ */
+ public AntiHijackTaskExecution(TaskExecution<R> execution, Executor asyncContinuationExecutor) {
+ super(execution, asyncContinuationExecutor);
+ this.execution = execution;
+ }
+
+ @Override
+ public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+ return preventThreadHijack(execution.statusesAsync());
+ }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
new file mode 100644
index 0000000..3da429d
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Delegates {@link TaskExecution} to the future of {@link TaskExecutionInternal}.
+ *
+ * @param <R> Result type.
+ */
+public class DelegatingTaskExecution<R> implements TaskExecution<R> {
+ private final CompletableFuture<TaskExecutionInternal<R>> delegate;
+
+ public DelegatingTaskExecution(CompletableFuture<TaskExecutionInternal<R>> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return delegate.thenCompose(TaskExecutionInternal::resultAsync);
+ }
+
+ @Override
+ public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+ return delegate.thenCompose(TaskExecutionInternal::statusesAsync);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ return delegate.thenCompose(TaskExecutionInternal::cancelAsync);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable JobStatus> statusAsync() {
+ return delegate.thenCompose(TaskExecutionInternal::statusAsync);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+ return delegate.thenCompose(execution -> execution.changePriorityAsync(newPriority));
+ }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
new file mode 100644
index 0000000..400c413
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+
+/**
+ * Compute job submitter.
+ */
+@FunctionalInterface
+public interface JobSubmitter {
+ /**
+ * Submits compute job for an execution.
+ *
+ * @param computeJobRunner Computer job start parameters.
+ */
+ JobExecution<Object> submit(ComputeJobRunner computeJobRunner);
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
new file mode 100644
index 0000000..0b842d1
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
+import static org.apache.ignite.internal.util.ArrayUtils.concat;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
+import org.apache.ignite.internal.compute.queue.QueueExecution;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Internal map reduce task execution object. Runs the {@link MapReduceTask#split(TaskExecutionContext, Object...)} method of the task as a
+ * compute job, then submits the resulting list of jobs. Waits for completion of all compute jobs, then submits the
+ * {@link MapReduceTask#reduce(Map)} method as a compute job. The result of the task is the result of the split method.
+ *
+ * @param <R> Task result type.
+ */
+@SuppressWarnings("unchecked")
+public class TaskExecutionInternal<R> implements JobExecution<R> {
+ private static final IgniteLogger LOG = Loggers.forClass(TaskExecutionInternal.class);
+
+ private final QueueExecution<SplitResult<R>> splitExecution;
+
+ private final CompletableFuture<List<JobExecution<Object>>> executionsFuture;
+
+ private final CompletableFuture<Map<UUID, Object>> resultsFuture;
+
+ private final CompletableFuture<QueueExecution<R>> reduceExecutionFuture;
+
+ private final AtomicReference<JobStatus> reduceFailedStatus = new AtomicReference<>();
+
+ /**
+ * Construct an execution object and starts executing.
+ *
+ * @param executorService Compute jobs executor.
+ * @param jobSubmitter Compute jobs submitter.
+ * @param taskClass Map reduce task class.
+ * @param context Task execution context.
+ * @param args Task arguments.
+ */
+ public TaskExecutionInternal(
+ PriorityQueueExecutor executorService,
+ JobSubmitter jobSubmitter,
+ Class<? extends MapReduceTask<R>> taskClass,
+ TaskExecutionContext context,
+ Object... args
+ ) {
+ LOG.debug("Executing task {}", taskClass.getName());
+ splitExecution = executorService.submit(
+ () -> {
+ MapReduceTask<R> task = instantiateTask(taskClass);
+ return new SplitResult<>(task, task.split(context, args));
+ },
+ Integer.MAX_VALUE,
+ 0
+ );
+
+ executionsFuture = splitExecution.resultAsync().thenApply(splitResult -> {
+ List<ComputeJobRunner> runners = splitResult.runners();
+ LOG.debug("Submitting {} jobs for {}", runners.size(), taskClass.getName());
+ return submit(runners, jobSubmitter);
+ });
+
+ resultsFuture = executionsFuture.thenCompose(TaskExecutionInternal::resultsAsync);
+
+ reduceExecutionFuture = resultsFuture.thenApply(results -> {
+ LOG.debug("Running reduce job for {}", taskClass.getName());
+
+ // This future is already finished
+ MapReduceTask<R> task = splitExecution.resultAsync().thenApply(SplitResult::task).join();
+
+ return executorService.submit(
+ () -> task.reduce(results),
+ Integer.MAX_VALUE,
+ 0
+ );
+ }).whenComplete(this::captureReduceFailure);
+ }
+
+ private void captureReduceFailure(QueueExecution<R> reduceExecution, Throwable throwable) {
+ if (throwable != null) {
+ // Capture the reduce execution failure reason and time.
+ JobState state = throwable instanceof CancellationException ? CANCELED : FAILED;
+ reduceFailedStatus.set(
+ splitExecution.status().toBuilder()
+ .state(state)
+ .finishTime(Instant.now())
+ .build()
+ );
+ }
+ }
+
+ @Override
+ public CompletableFuture<R> resultAsync() {
+ return reduceExecutionFuture.thenCompose(QueueExecution::resultAsync);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable JobStatus> statusAsync() {
+ JobStatus splitStatus = splitExecution.status();
+ if (splitStatus == null) {
+ // Return null even if the reduce execution can still be retained.
+ return nullCompletedFuture();
+ }
+
+ if (splitStatus.state() != COMPLETED) {
+ return completedFuture(splitStatus);
+ }
+
+ // This future is complete when reduce execution job is submitted, return status from it.
+ if (reduceExecutionFuture.isDone()) {
+ return reduceExecutionFuture.handle((reduceExecution, throwable) -> {
+ if (throwable == null) {
+ JobStatus reduceStatus = reduceExecution.status();
+ if (reduceStatus == null) {
+ return null;
+ }
+ return reduceStatus.toBuilder()
+ .id(splitStatus.id())
+ .createTime(splitStatus.createTime())
+ .startTime(splitStatus.startTime())
+ .build();
+ }
+ return reduceFailedStatus.get();
+ });
+ }
+
+ // At this point split is complete but reduce job is not submitted yet.
+ return completedFuture(splitStatus.toBuilder()
+ .state(EXECUTING)
+ .finishTime(null)
+ .build());
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> cancelAsync() {
+ // If the split job is not complete, this will cancel the executions future.
+ splitExecution.cancel();
+
+ // This means we didn't submit any jobs yet.
+ if (executionsFuture.cancel(true)) {
+ return trueCompletedFuture();
+ }
+
+ // Split job was complete, results future was running, but not complete yet.
+ if (resultsFuture.cancel(true)) {
+ return executionsFuture.thenCompose(executions -> {
+ CompletableFuture<Boolean>[] cancelFutures = executions.stream()
+ .map(JobExecution::cancelAsync)
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(cancelFutures);
+ }).thenApply(unused -> true);
+ }
+
+ // Results arrived but reduce is not yet submitted
+ if (reduceExecutionFuture.cancel(true)) {
+ return trueCompletedFuture();
+ }
+
+ return reduceExecutionFuture.thenApply(QueueExecution::cancel);
+ }
+
+ @Override
+ public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+ // If the split job is not started
+ if (splitExecution.changePriority(newPriority)) {
+ return trueCompletedFuture();
+ }
+
+ // This future is complete when reduce execution job is submitted, try to change its priority.
+ if (reduceExecutionFuture.isDone()) {
+ return reduceExecutionFuture.thenApply(reduceExecution -> reduceExecution.changePriority(newPriority));
+ }
+
+ return executionsFuture.thenCompose(executions -> {
+ CompletableFuture<Boolean>[] changePriorityFutures = executions.stream()
+ .map(execution -> execution.changePriorityAsync(newPriority))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(changePriorityFutures).thenApply(unused -> {
+ List<@Nullable Boolean> results = Arrays.stream(changePriorityFutures).map(CompletableFuture::join).collect(toList());
+ if (results.stream().allMatch(b -> b == Boolean.TRUE)) {
+ return true;
+ }
+ if (results.stream().anyMatch(Objects::isNull)) {
+ //noinspection RedundantCast this cast is to satisfy spotbugs
+ return (Boolean) null;
+ }
+ return false;
+ });
+ });
+ }
+
+ CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+ return executionsFuture.thenCompose(executions -> {
+ CompletableFuture<JobStatus>[] statusFutures = executions.stream()
+ .map(JobExecution::statusAsync)
+ .toArray(CompletableFuture[]::new);
+
+ return CompletableFutures.allOf(statusFutures);
+ });
+
+ }
+
+ private static CompletableFuture<Map<UUID, Object>> resultsAsync(List<JobExecution<Object>> executions) {
+ CompletableFuture<?>[] resultFutures = executions.stream()
+ .map(JobExecution::resultAsync)
+ .toArray(CompletableFuture[]::new);
+
+ CompletableFuture<UUID>[] idFutures = executions.stream()
+ .map(JobExecution::idAsync)
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(concat(resultFutures, idFutures)).thenApply(unused -> {
+ Map<UUID, Object> results = new HashMap<>();
+
+ for (int i = 0; i < resultFutures.length; i++) {
+ results.put(idFutures[i].join(), resultFutures[i].join());
+ }
+
+ return results;
+ });
+ }
+
+ private static <R> List<JobExecution<Object>> submit(List<ComputeJobRunner> runners, JobSubmitter jobSubmitter) {
+ return runners.stream()
+ .map(jobSubmitter::submit)
+ .collect(toList());
+ }
+
+ private static class SplitResult<R> {
+ private final MapReduceTask<R> task;
+
+ private final List<ComputeJobRunner> runners;
+
+ private SplitResult(MapReduceTask<R> task, List<ComputeJobRunner> runners) {
+ this.task = task;
+ this.runners = runners;
+ }
+
+ private List<ComputeJobRunner> runners() {
+ return runners;
+ }
+
+ private MapReduceTask<R> task() {
+ return task;
+ }
+ }
+}