IGNITE-22064 General MapReduce API (#3665)

diff --git a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
index 0d8aa21..515e902 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
@@ -22,13 +22,12 @@
  *
  * @param <R> Job result type.
  */
-
 public interface ComputeJob<R> {
     /**
      * Executes the job on an Ignite node.
      *
-     * @param context  The execution context.
-     * @param args     Job arguments.
+     * @param context The execution context.
+     * @param args Job arguments.
      * @return Job result.
      */
     R execute(JobExecutionContext context, Object... args);
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 1cefe19..ad3397b 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -28,6 +28,7 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
@@ -46,7 +47,7 @@
      * @param nodes Candidate nodes; the job will be executed on one of them.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Job execution object.
      */
@@ -86,7 +87,7 @@
      * @param nodes Candidate nodes; the job will be executed on one of them.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Job result future.
      */
@@ -127,7 +128,7 @@
      * @param nodes Candidate nodes; the job will be executed on one of them.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Job result.
      * @throws ComputeException If there is any problem executing the job.
@@ -169,7 +170,7 @@
      * @param key Key that identifies the node to execute the job on.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @param <R> Job result type.
      * @return Job execution object.
@@ -215,7 +216,7 @@
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param args Arguments of the job.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param <R> Job result type.
      * @return Job execution object.
      */
@@ -261,7 +262,7 @@
      * @param key Key that identifies the node to execute the job on.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @param <R> Job result type.
      * @return Job result future.
@@ -310,7 +311,7 @@
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
      * @param args Arguments of the job.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param <R> Job result type.
      * @return Job result future.
      */
@@ -359,7 +360,7 @@
      * @param key Key that identifies the node to execute the job on.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Job result.
      * @throws ComputeException If there is any problem executing the job.
@@ -405,7 +406,7 @@
      * @param keyMapper Mapper used to map the key to a binary representation.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Job result.
      * @throws ComputeException If there is any problem executing the job.
@@ -451,7 +452,7 @@
      * @param nodes Nodes to execute the job on.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Map from node to job execution object.
      */
@@ -490,7 +491,7 @@
      * @param nodes Nodes to execute the job on.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Map from node to job result.
      */
@@ -544,7 +545,7 @@
      * @param nodes Nodes to execute the job on.
      * @param units Deployment units. Can be empty.
      * @param jobClassName Name of the job class to execute.
-     * @param options job execution options (priority, max retries).
+     * @param options Job execution options (priority, max retries).
      * @param args Arguments of the job.
      * @return Map from node to job result.
      * @throws ComputeException If there is any problem executing the job.
@@ -585,4 +586,40 @@
     ) {
         return executeBroadcast(nodes, units, jobClassName, DEFAULT, args);
     }
+
+    /**
+     * Submits a {@link MapReduceTask} of the given class for an execution.
+     *
+     * @param units Deployment units.
+     * @param taskClassName Map reduce task class name.
+     * @param args Task arguments.
+     * @param <R> Task result type.
+     * @return Task execution interface.
+     */
+    <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args);
+
+    /**
+     * Submits a {@link MapReduceTask} of the given class for an execution. A shortcut for {@code submitMapReduce(...).resultAsync()}.
+     *
+     * @param units Deployment units.
+     * @param taskClassName Map reduce task class name.
+     * @param args Task arguments.
+     * @param <R> Task result type.
+     * @return Task result future.
+     */
+    default <R> CompletableFuture<R> executeMapReduceAsync(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        return this.<R>submitMapReduce(units, taskClassName, args).resultAsync();
+    }
+
+    /**
+     * Executes a {@link MapReduceTask} of the given class.
+     *
+     * @param units Deployment units.
+     * @param taskClassName Map reduce task class name.
+     * @param args Task arguments.
+     * @param <R> Task result type.
+     * @return Task result.
+     * @throws ComputeException If there is any problem executing the task.
+     */
+    <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args);
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index 8c30a8f..58a0370 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -23,6 +23,11 @@
  * Context of the {@link ComputeJob} execution.
  */
 public interface JobExecutionContext {
+    /**
+     * Ignite API entry point.
+     *
+     * @return Ignite instance.
+     */
     Ignite ignite();
 
     /**
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java b/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java
new file mode 100644
index 0000000..a12fd29
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/TaskExecution.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Compute task control object. Methods inherited from the {@link JobExecution} allows control of the task coordination job.
+ *
+ * @param <R> Task result type.
+ */
+public interface TaskExecution<R> extends JobExecution<R> {
+    /**
+     * Returns a collection of statuses of the jobs which are executing under this task. The resulting future is completed only after the
+     * jobs are submitted for execution. The list could contain {@code null} values if the time for retaining job status has been exceeded.
+     *
+     * @return A list of current statuses of the jobs.
+     */
+    CompletableFuture<List<@Nullable JobStatus>> statusesAsync();
+
+    /**
+     * Returns a collection of ids of the jobs which are executing under this task. The resulting future is completed only after the
+     * jobs are submitted for execution. The list could contain {@code null} values if the time for retaining job status has been exceeded.
+     *
+     * @return A list of ids of the jobs.
+     */
+    default CompletableFuture<List<@Nullable UUID>> idsAsync() {
+        return statusesAsync().thenApply(statuses -> statuses.stream()
+                .map(jobStatus -> jobStatus != null ? jobStatus.id() : null)
+                .collect(Collectors.toList()));
+    }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java b/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java
new file mode 100644
index 0000000..2445fd5
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/task/ComputeJobRunner.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.task;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A description of the job to be submitted as a result of the split step of the {@link MapReduceTask}. Reflects the parameters of the
+ * {@link org.apache.ignite.compute.IgniteCompute#submit(Set, List, String, JobExecutionOptions, Object...) IgniteCompute#submit} method.
+ */
+public class ComputeJobRunner {
+    private final Set<ClusterNode> nodes;
+
+    private final List<DeploymentUnit> units;
+
+    private final String jobClassName;
+
+    private final JobExecutionOptions options;
+
+    private final Object[] args;
+
+    private ComputeJobRunner(
+            Set<ClusterNode> nodes,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            JobExecutionOptions options,
+            Object[] args
+    ) {
+        this.nodes = Collections.unmodifiableSet(nodes);
+        this.units = units;
+        this.jobClassName = jobClassName;
+        this.options = options;
+        this.args = args;
+    }
+
+    /**
+     * Candidate nodes; the job will be executed on one of them.
+     *
+     * @return A set of candidate nodes.
+     */
+    public Set<ClusterNode> nodes() {
+        return nodes;
+    }
+
+    /**
+     * Deployment units. Can be empty.
+     *
+     * @return Deployment units.
+     */
+    public List<DeploymentUnit> units() {
+        return units;
+    }
+
+    /**
+     * Name of the job class to execute.
+     *
+     * @return Name of the job class to execute.
+     */
+    public String jobClassName() {
+        return jobClassName;
+    }
+
+    /**
+     * Job execution options (priority, max retries).
+     *
+     * @return Job execution options.
+     */
+    public JobExecutionOptions options() {
+        return options;
+    }
+
+    /**
+     * Arguments of the job.
+     *
+     * @return Arguments of the job.
+     */
+    public Object[] args() {
+        return args;
+    }
+
+    /**
+     * Returns new builder using this definition.
+     *
+     * @return New builder.
+     */
+    public ComputeJobRunnerBuilder toBuilder() {
+        return builder().nodes(nodes).units(units).jobClassName(jobClassName).options(options).args(args);
+    }
+
+    /**
+     * Returns new builder.
+     *
+     * @return New builder.
+     */
+    public static ComputeJobRunnerBuilder builder() {
+        return new ComputeJobRunnerBuilder();
+    }
+
+    /**
+     * Job submit parameters builder.
+     */
+    public static class ComputeJobRunnerBuilder {
+        private final Set<ClusterNode> nodes = new HashSet<>();
+
+        private final List<DeploymentUnit> units = new ArrayList<>();
+
+        private String jobClassName;
+
+        private JobExecutionOptions options = JobExecutionOptions.DEFAULT;
+
+        private Object[] args;
+
+        /**
+         * Adds nodes to the set of candidate nodes.
+         *
+         * @param nodes A collection of candidate nodes.
+         * @return Builder instance.
+         */
+        public ComputeJobRunnerBuilder nodes(Collection<ClusterNode> nodes) {
+            this.nodes.addAll(nodes);
+            return this;
+        }
+
+        /**
+         * Adds a node to the set of candidate nodes.
+         *
+         * @param node Candidate node.
+         * @return Builder instance.
+         */
+        public ComputeJobRunnerBuilder node(ClusterNode node) {
+            nodes.add(node);
+            return this;
+        }
+
+        /**
+         * Adds deployment units.
+         *
+         * @param units A collection of deployment units.
+         * @return Builder instance.
+         */
+        public ComputeJobRunnerBuilder units(Collection<DeploymentUnit> units) {
+            this.units.addAll(units);
+            return this;
+        }
+
+        /**
+         * Sets the name of the job class to execute.
+         *
+         * @param jobClassName A job class name.
+         * @return Builder instance.
+         */
+        public ComputeJobRunnerBuilder jobClassName(String jobClassName) {
+            this.jobClassName = jobClassName;
+            return this;
+        }
+
+        /**
+         * Sets job execution options (priority, max retries).
+         *
+         * @param options Job execution options.
+         * @return Builder instance.
+         */
+        public ComputeJobRunnerBuilder options(JobExecutionOptions options) {
+            this.options = options;
+            return this;
+        }
+
+        /**
+         * Sets arguments of the job.
+         *
+         * @param args Arguments of the job.
+         * @return Builder instance.
+         */
+        public ComputeJobRunnerBuilder args(Object... args) {
+            this.args = args;
+            return this;
+        }
+
+        /**
+         * Constructs a compute job description object.
+         *
+         * @return Description object.
+         */
+        public ComputeJobRunner build() {
+            if (nodes.isEmpty()) {
+                throw new IllegalArgumentException();
+            }
+
+            return new ComputeJobRunner(nodes, units, jobClassName, options, args);
+        }
+    }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
new file mode 100644
index 0000000..09e88ce
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/task/MapReduceTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.task;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * A map reduce task interface. Implement this interface and pass a name of the implemented class to the
+ * {@link org.apache.ignite.compute.IgniteCompute#submitMapReduce(List, String, Object...) IgniteCompute#submitMapReduce} method to run this
+ * task.
+ *
+ * @param <R> Result type.
+ */
+public interface MapReduceTask<R> {
+    /**
+     * This method should return a list of compute job execution parameters which will be used to submit compute jobs.
+     *
+     * @param taskContext Task execution context.
+     * @param args Map reduce task arguments.
+     * @return A list of compute job execution parameters.
+     */
+    List<ComputeJobRunner> split(TaskExecutionContext taskContext, Object... args);
+
+    /**
+     * This is a finishing step in the task execution. This method will be called with the map from identifiers of compute jobs submitted as
+     * a result of the {@link #split(TaskExecutionContext, Object...)} method call to the results of the execution of the corresponding
+     * job. The return value of this method will be returned as a result of this task.
+     *
+     * @param results Map from compute job ids to their results.
+     * @return Final task result.
+     */
+    R reduce(Map<UUID, ?> results);
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
new file mode 100644
index 0000000..2a45023
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/compute/task/TaskExecutionContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compute.task;
+
+import org.apache.ignite.Ignite;
+
+/** Context of the compute task execution. */
+public interface TaskExecutionContext {
+    /**
+     * Ignite API entry point.
+     *
+     * @return Ignite instance.
+     */
+    Ignite ignite();
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 03ae7c2..847a297 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -37,6 +37,7 @@
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.TaskExecution;
 import org.apache.ignite.internal.client.ClientUtils;
 import org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
@@ -112,11 +113,7 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<R>submit(nodes, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
-        }
+        return sync(executeAsync(nodes, units, jobClassName, options, args));
     }
 
     /** {@inheritDoc} */
@@ -208,11 +205,7 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<R>submitColocated(tableName, key, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
-        }
+        return sync(executeColocatedAsync(tableName, key, units, jobClassName, options, args));
     }
 
     /** {@inheritDoc} */
@@ -226,11 +219,7 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<K, R>submitColocated(tableName, key, keyMapper, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
-        }
+        return sync(executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args));
     }
 
     /** {@inheritDoc} */
@@ -261,6 +250,17 @@
         return map;
     }
 
+    @Override
+    public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-22124
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        return sync(executeMapReduceAsync(units, taskClassName, args));
+    }
+
     private CompletableFuture<SubmitResult> executeOnNodesAsync(
             Set<ClusterNode> nodes,
             List<DeploymentUnit> units,
@@ -441,4 +441,12 @@
     private static SubmitResult unpackSubmitResult(PayloadInputChannel ch) {
         return new SubmitResult(ch.in().unpackUuid(), ch.notificationFuture());
     }
+
+    private static <R> R sync(CompletableFuture<R> future) {
+        try {
+            return future.join();
+        } catch (CompletionException e) {
+            throw ExceptionUtils.sneakyThrow(ClientUtils.ensurePublicException(e));
+        }
+    }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
new file mode 100644
index 0000000..dd1b766
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute.task;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.jetbrains.annotations.Nullable;
+
+// TODO https://issues.apache.org/jira/browse/IGNITE-22124
+/**
+ * Client compute task implementation.
+ *
+ * @param <R> Task result type.
+ */
+public class ClientTaskExecution<R> implements TaskExecution<R> {
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public CompletableFuture<@Nullable JobStatus> statusAsync() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 94d7894..3d5c9d7 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -41,6 +41,7 @@
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -121,11 +122,7 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<R>submit(nodes, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.wrap(e);
-        }
+        return sync(executeAsync(nodes, units, jobClassName, options, args));
     }
 
     @Override
@@ -163,11 +160,7 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<R>submitColocated(tableName, key, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.wrap(e);
-        }
+        return sync(executeColocatedAsync(tableName, key, units, jobClassName, options, args));
     }
 
     /** {@inheritDoc} */
@@ -181,11 +174,7 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<K, R>submitColocated(tableName, key, keyMapper, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.wrap(e);
-        }
+        return sync(executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args));
     }
 
     @Override
@@ -199,6 +188,16 @@
         return null;
     }
 
+    @Override
+    public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        return null;
+    }
+
+    @Override
+    public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        return sync(executeMapReduceAsync(units, taskClassName, args));
+    }
+
     private <R> JobExecution<R> completedExecution(R result) {
         return jobExecution(completedFuture(result));
     }
@@ -261,4 +260,12 @@
     public CompletableFuture<@Nullable Boolean> changePriorityAsync(UUID jobId, int newPriority) {
         return trueCompletedFuture();
     }
+
+    private static <R> R sync(CompletableFuture<R> future) {
+        try {
+            return future.join();
+        } catch (CompletionException e) {
+            throw ExceptionUtils.wrap(e);
+        }
+    }
 }
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 7006989..553267f 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -27,6 +27,7 @@
 import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.instanceOf;
@@ -46,6 +47,7 @@
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.TaskExecution;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -64,6 +66,7 @@
  * corresponding job class to the jobs source set. The integration tests depend on this source set so the job class will be visible and it
  * will be automatically compiled and packed into the ignite-integration-test-jobs-1.0-SNAPSHOT.jar.
  */
+@SuppressWarnings("resource")
 public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest {
     protected abstract List<DeploymentUnit> units();
 
@@ -379,6 +382,43 @@
         assertThat(execution.cancelAsync(), willBe(false));
     }
 
+    @Test
+    void submitMapReduce() {
+        IgniteImpl entryNode = node(0);
+
+        TaskExecution<Integer> taskExecution = entryNode.compute().submitMapReduce(units(), mapReduceTaskClassName(), units());
+
+        int sumOfNodeNamesLengths = CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
+        assertThat(taskExecution.resultAsync(), willBe(sumOfNodeNamesLengths));
+
+        // Statuses list contains statuses for 3 running nodes
+        assertThat(taskExecution.statusesAsync(), willBe(contains(
+                jobStatusWithState(COMPLETED),
+                jobStatusWithState(COMPLETED),
+                jobStatusWithState(COMPLETED)
+        )));
+    }
+
+    @Test
+    void executeMapReduceAsync() {
+        IgniteImpl entryNode = node(0);
+
+        CompletableFuture<Integer> future = entryNode.compute().executeMapReduceAsync(units(), mapReduceTaskClassName(), units());
+
+        int sumOfNodeNamesLengths = CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
+        assertThat(future, willBe(sumOfNodeNamesLengths));
+    }
+
+    @Test
+    void executeMapReduce() {
+        IgniteImpl entryNode = node(0);
+
+        int result = entryNode.compute().executeMapReduce(units(), mapReduceTaskClassName(), units());
+
+        int sumOfNodeNamesLengths = CLUSTER.runningNodes().map(IgniteImpl::name).map(String::length).reduce(Integer::sum).orElseThrow();
+        assertThat(result, is(sumOfNodeNamesLengths));
+    }
+
     static IgniteImpl node(int i) {
         return CLUSTER.node(i);
     }
@@ -395,6 +435,10 @@
         return FailingJob.class.getName();
     }
 
+    private static String mapReduceTaskClassName() {
+        return MapReduce.class.getName();
+    }
+
     static void assertComputeException(Exception ex, Throwable cause) {
         assertComputeException(ex, cause.getClass().getName(), cause.getMessage());
     }
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index b764e5b..77fb323 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -326,9 +326,7 @@
         return IntStream.range(0, initialNodes()).mapToObj(Arguments::of);
     }
 
-
     private static class CustomFailingJob implements ComputeJob<String> {
-        /** {@inheritDoc} */
         @Override
         public String execute(JobExecutionContext context, Object... args) {
             throw ExceptionUtils.sneakyThrow((Throwable) args[0]);
@@ -336,8 +334,6 @@
     }
 
     private static class WaitLatchJob implements ComputeJob<String> {
-
-        /** {@inheritDoc} */
         @Override
         public String execute(JobExecutionContext context, Object... args) {
             try {
@@ -350,10 +346,8 @@
     }
 
     private static class WaitLatchThrowExceptionOnFirstExecutionJob implements ComputeJob<String> {
-
         static final AtomicInteger counter = new AtomicInteger(0);
 
-        /** {@inheritDoc} */
         @Override
         public String execute(JobExecutionContext context, Object... args) {
             try {
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
new file mode 100644
index 0000000..344d11c
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
+import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTimeFinishTime;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.time.Instant;
+import java.util.List;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.InteractiveTasks;
+import org.apache.ignite.internal.compute.utils.TestingJobExecution;
+import org.apache.ignite.lang.IgniteException;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("resource")
+class ItMapReduceTest extends ClusterPerClassIntegrationTest {
+    @BeforeEach
+    void initChannels() {
+        InteractiveJobs.clearState();
+        InteractiveTasks.clearState();
+
+        List<String> allNodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).collect(toList());
+        InteractiveJobs.initChannels(allNodeNames);
+    }
+
+    @Test
+    void taskMaintainsStatus() throws Exception {
+        IgniteImpl entryNode = CLUSTER.node(0);
+
+        // Given running task.
+        TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+        TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
+        testExecution.assertExecuting();
+        InteractiveTasks.GlobalApi.assertAlive();
+
+        // Save status before split.
+        JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+        // And statuses list future is not complete yet.
+        assertThat(taskExecution.statusesAsync().isDone(), is(false));
+
+        // When finish the split job.
+        InteractiveTasks.GlobalApi.finishSplit();
+
+        // Then the task is still executing while waiting for the jobs to finish.
+        testExecution.assertExecuting();
+        assertTaskStatusIs(taskExecution, EXECUTING, statusBeforeSplit, nullValue(Instant.class));
+
+        // And statuses list contains statuses for 3 running nodes.
+        assertJobStates(taskExecution, EXECUTING);
+
+        // When finish the jobs.
+        InteractiveJobs.all().finishReturnWorkerNames();
+
+        // Then the task is still executing while waiting for the reduce to finish.
+        testExecution.assertExecuting();
+        assertTaskStatusIs(taskExecution, EXECUTING, statusBeforeSplit, nullValue(Instant.class));
+
+        // When finish the reduce job.
+        InteractiveTasks.GlobalApi.finishReduce();
+
+        // Then the task is complete and the result is the list of all node names.
+        String[] allNodeNames = CLUSTER.runningNodes().map(IgniteImpl::name).toArray(String[]::new);
+        assertThat(taskExecution.resultAsync(), willBe(containsInAnyOrder(allNodeNames)));
+
+        // And task status is completed.
+        assertTaskStatusIs(taskExecution, COMPLETED, statusBeforeSplit, notNullValue(Instant.class));
+
+        // And statuses list contains statuses for 3 completed jobs.
+        assertJobStates(taskExecution, COMPLETED);
+    }
+
+    @Test
+    void splitThrowsException() throws Exception {
+        IgniteImpl entryNode = CLUSTER.node(0);
+
+        // Given running task.
+        TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+        // Save status before split.
+        JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+        // When the split job throws an exception.
+        InteractiveTasks.GlobalApi.throwException();
+
+        // Then the task fails.
+        assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+
+        // And statuses list fails.
+        assertThat(taskExecution.statusesAsync(), willThrow(IgniteException.class));
+    }
+
+    @Test
+    void cancelSplit() throws Exception {
+        IgniteImpl entryNode = CLUSTER.node(0);
+
+        // Given running task.
+        TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+        // Save status before split.
+        JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+        // When cancel the task.
+        assertThat(taskExecution.cancelAsync(), willBe(true));
+
+        // Then the task is cancelled.
+        assertTaskFailed(taskExecution, CANCELED, statusBeforeSplit);
+
+        // And statuses list will fail.
+        assertThat(taskExecution.statusesAsync(), willThrow(RuntimeException.class));
+    }
+
+    @Test
+    void jobThrowsException() throws Exception {
+        IgniteImpl entryNode = CLUSTER.node(0);
+
+        // Given running task.
+        TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+        // Save status before split.
+        JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+        // And finish the split job.
+        finishSplit(taskExecution);
+
+        // When jobs throw an exception.
+        InteractiveJobs.all().throwException();
+
+        // Then the task fails.
+        assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+    }
+
+    @Test
+    void cancelJobs() throws Exception {
+        IgniteImpl entryNode = CLUSTER.node(0);
+
+        // Given running task.
+        TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+        TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
+        testExecution.assertExecuting();
+        InteractiveTasks.GlobalApi.assertAlive();
+
+        // Save status before split.
+        JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+        // And finish the split job.
+        finishSplit(taskExecution);
+
+        // When cancel the task.
+        assertThat(taskExecution.cancelAsync(), willBe(true));
+
+        // Then the task is cancelled.
+        assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+
+        // And statuses list contains canceled statuses.
+        assertJobStates(taskExecution, CANCELED);
+    }
+
+    @Test
+    void reduceThrowsException() throws Exception {
+        IgniteImpl entryNode = CLUSTER.node(0);
+
+        // Given running task.
+        TaskExecution<List<String>> taskExecution = startTask(entryNode);
+
+        // Save status before split.
+        JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+        // And finish the split job.
+        finishSplit(taskExecution);
+
+        // And finish jobs.
+        InteractiveJobs.all().finishReturnWorkerNames();
+
+        // And reduce throws an exception.
+        InteractiveTasks.GlobalApi.throwException();
+
+        // Then the task fails.
+        assertTaskFailed(taskExecution, FAILED, statusBeforeSplit);
+
+        // And statuses list contains completed statuses.
+        assertJobStates(taskExecution, COMPLETED);
+    }
+
+    @Test
+    void cancelReduce() throws Exception {
+        IgniteImpl entryNode = CLUSTER.node(0);
+
+        // Given running task.
+        TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+        TestingJobExecution<List<String>> testExecution = new TestingJobExecution<>(taskExecution);
+        testExecution.assertExecuting();
+        InteractiveTasks.GlobalApi.assertAlive();
+
+        // Save status before split.
+        JobStatus statusBeforeSplit = taskExecution.statusAsync().join();
+
+        // And finish the split job.
+        finishSplit(taskExecution);
+
+        // And finish jobs.
+        InteractiveJobs.all().finishReturnWorkerNames();
+
+        // Wait for the reduce job to start.
+        InteractiveTasks.GlobalApi.assertAlive();
+
+        // When cancel the task.
+        assertThat(taskExecution.cancelAsync(), willBe(true));
+
+        // Then the task is cancelled.
+        assertTaskFailed(taskExecution, CANCELED, statusBeforeSplit);
+
+        // And statuses list contains completed statuses.
+        assertJobStates(taskExecution, COMPLETED);
+    }
+
+    private static TaskExecution<List<String>> startTask(IgniteImpl entryNode) throws InterruptedException {
+        TaskExecution<List<String>> taskExecution = entryNode.compute().submitMapReduce(List.of(), InteractiveTasks.GlobalApi.name());
+        new TestingJobExecution<>(taskExecution).assertExecuting();
+        InteractiveTasks.GlobalApi.assertAlive();
+        return taskExecution;
+    }
+
+    private static void finishSplit(TaskExecution<List<String>> taskExecution) {
+        // Finish the split job.
+        InteractiveTasks.GlobalApi.finishSplit();
+
+        // And wait for statuses list contains statuses for 3 running nodes.
+        assertJobStates(taskExecution, EXECUTING);
+    }
+
+    private static void assertTaskFailed(TaskExecution<List<String>> taskExecution, JobState jobState, JobStatus statusBeforeSplit) {
+        assertThat(taskExecution.resultAsync(), willThrow(IgniteException.class));
+        assertTaskStatusIs(taskExecution, jobState, statusBeforeSplit, notNullValue(Instant.class));
+    }
+
+    private static void assertTaskStatusIs(
+            TaskExecution<List<String>> taskExecution,
+            JobState jobState,
+            JobStatus statusBeforeSplit,
+            Matcher<Instant> finishTimeMatcher
+    ) {
+        assertThat(taskExecution.statusAsync(), willBe(jobStatusWithStateAndCreateTimeStartTimeFinishTime(
+                is(jobState),
+                is(statusBeforeSplit.createTime()),
+                is(statusBeforeSplit.startTime()),
+                is(finishTimeMatcher)
+        )));
+        assertThat(taskExecution.idAsync(), willBe(statusBeforeSplit.id()));
+    }
+
+    private static void assertJobStates(TaskExecution<List<String>> taskExecution, JobState state) {
+        await().until(taskExecution::statusesAsync, willBe(contains(
+                jobStatusWithState(state),
+                jobStatusWithState(state),
+                jobStatusWithState(state)
+        )));
+    }
+}
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index 4636348..1d53015 100644
--- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -245,8 +245,10 @@
                             break;
                         case RETURN:
                             return "Done";
+                        case RETURN_WORKER_NAME:
+                            return workerNodeName;
                         case GET_WORKER_NAME:
-                            NODE_CHANNELS.get(workerNodeName).add(context.ignite().name());
+                            NODE_CHANNELS.get(workerNodeName).add(workerNodeName);
                             break;
                         default:
                             throw new IllegalStateException("Unexpected value: " + receivedSignal);
@@ -321,15 +323,12 @@
             INTERACTIVE_JOB_RUN_TIMES.forEach((nodeName, runTimes) -> assertThat(runTimes, equalTo(1)));
         }
 
-        /**
-         * Finishes all {@link InteractiveJob}s.
-         */
-        public void finish() {
+        private static void sendTerminalSignal(Signal signal) {
             NODE_SIGNALS.forEach((nodeName, channel) -> {
                 try {
-                    channel.offer(Signal.RETURN, WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+                    channel.offer(signal, WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                 } catch (InterruptedException e) {
-                    throw new RuntimeException("Can not send finish signal to he node", e);
+                    throw new RuntimeException("Can not send signal to the node", e);
                 }
             });
 
@@ -341,6 +340,27 @@
                 );
             });
         }
+
+        /**
+         * Finishes all {@link InteractiveJob}s.
+         */
+        public void finish() {
+            sendTerminalSignal(Signal.RETURN);
+        }
+
+        /**
+         * Finishes all {@link InteractiveJob}s by returning worker node names.
+         */
+        public void finishReturnWorkerNames() {
+            sendTerminalSignal(Signal.RETURN_WORKER_NAME);
+        }
+
+        /**
+         * Finishes all {@link InteractiveJob}s by returning worker node names.
+         */
+        public void throwException() {
+            sendTerminalSignal(Signal.THROW);
+        }
     }
 
     /**
diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
new file mode 100644
index 0000000..c19e7c6
--- /dev/null
+++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.utils;
+
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+
+/**
+ * Tests DSL for interactive tasks. "Interactive" means that you can send messages and get responses to/from running tasks.
+ *
+ * <p>For example, you can start {@link GlobalInteractiveMapReduceTask} on some node, get the name of worker node for split or reduce job,
+ * ask split or reduce job to complete successfully or throw exception. Also, this class gives useful assertions for task states.
+ */
+public final class InteractiveTasks {
+    /**
+     * ACK for {@link Signal#CONTINUE}. Returned by a task that has received the signal. Used to check that the task is alive.
+     */
+    private static final Object ACK = new Object();
+
+    /**
+     * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a
+     * signal to the task via this channel and get a response from it via {@link #GLOBAL_CHANNEL}.
+     */
+    private static final BlockingQueue<Signal> GLOBAL_SIGNALS = new LinkedBlockingQueue<>();
+
+    /**
+     * Class-wide queue that is used as a communication channel between {@link GlobalInteractiveMapReduceTask} and test code. You can send a
+     * signal to the task via {@link #GLOBAL_SIGNALS} and get a response from it via this channel.
+     */
+    private static final BlockingQueue<Object> GLOBAL_CHANNEL = new LinkedBlockingQueue<>();
+
+    /**
+     * This counter indicated how many {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} methods are running
+     * now. This counter increased each time the {@link GlobalInteractiveMapReduceTask#split(TaskExecutionContext, Object...)} is called and
+     * decreased when the method is finished (whatever the result is). Checked in {@link #clearState}.
+     */
+    private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new AtomicInteger(0);
+
+    /**
+     * This counter indicated how many {@link GlobalInteractiveMapReduceTask#reduce(Map)} methods are running now. This counter increased
+     * each time the {@link GlobalInteractiveMapReduceTask#reduce(Map)} is called and decreased when the method is finished (whatever the
+     * result is). Checked in {@link #clearState}.
+     */
+    private static final AtomicInteger RUNNING_GLOBAL_REDUCE_CNT = new AtomicInteger(0);
+
+    /**
+     * The timeout in seconds that defines how long should we wait for async calls. Almost all methods use this timeout.
+     */
+    private static final long WAIT_TIMEOUT_SECONDS = 15;
+
+    /**
+     * Clear global state. Must be called before each testing scenario.
+     */
+    public static void clearState() {
+        assertThat(
+                "Global split job is running. Can not clear global state. Please, stop the job first.",
+                RUNNING_GLOBAL_SPLIT_CNT.get(),
+                is(0)
+        );
+        assertThat(
+                "Global reduce job is running. Can not clear global state. Please, stop the job first.",
+                RUNNING_GLOBAL_REDUCE_CNT.get(),
+                is(0)
+        );
+
+        GLOBAL_SIGNALS.clear();
+        GLOBAL_CHANNEL.clear();
+    }
+
+    /**
+     * Signals that are sent by test code to the tasks.
+     */
+    private enum Signal {
+        /**
+         * Signal to the task to continue running and send ACK as a response.
+         */
+        CONTINUE,
+
+        /**
+         * Ask task to throw an exception.
+         */
+        THROW,
+
+        /**
+         * Ask split method to return a list of jobs which will be executed on all nodes.
+         */
+        SPLIT_RETURN_ALL_NODES,
+
+        /**
+         * Ask reduce method to return a concatenation of jobs results.
+         */
+        REDUCE_RETURN
+    }
+
+    private static Signal listenSignal() {
+        try {
+            return GLOBAL_SIGNALS.take();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * If any of the args are strings, convert them to signals and offer them to the job.
+     *
+     * @param args Job args.
+     */
+    private static void offerArgsAsSignals(Object... args) {
+        for (Object arg : args) {
+            if (arg instanceof String) {
+                String signal = (String) arg;
+                try {
+                    GLOBAL_SIGNALS.offer(Signal.valueOf(signal));
+                } catch (IllegalArgumentException ignored) {
+                    // Ignore non-signal strings
+                }
+            }
+        }
+    }
+
+    /**
+     * Interactive map reduce task that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}.
+     */
+    private static class GlobalInteractiveMapReduceTask implements MapReduceTask<List<String>> {
+        @Override
+        public List<ComputeJobRunner> split(TaskExecutionContext context, Object... args) {
+            RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();
+
+            offerArgsAsSignals(args);
+
+            try {
+                while (true) {
+                    Signal receivedSignal = listenSignal();
+                    switch (receivedSignal) {
+                        case THROW:
+                            throw new RuntimeException();
+                        case CONTINUE:
+                            GLOBAL_CHANNEL.offer(ACK);
+                            break;
+                        case SPLIT_RETURN_ALL_NODES:
+                            return context.ignite().clusterNodes().stream().map(node ->
+                                    ComputeJobRunner.builder()
+                                            .jobClassName(InteractiveJobs.interactiveJobName())
+                                            .nodes(Set.of(node))
+                                            .build()
+                            ).collect(toList());
+                        default:
+                            throw new IllegalStateException("Unexpected value: " + receivedSignal);
+                    }
+                }
+            } finally {
+                RUNNING_GLOBAL_SPLIT_CNT.decrementAndGet();
+            }
+        }
+
+        @Override
+        public List<String> reduce(Map<UUID, ?> results) {
+            RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet();
+            try {
+                while (true) {
+                    Signal receivedSignal = listenSignal();
+                    switch (receivedSignal) {
+                        case THROW:
+                            throw new RuntimeException();
+                        case CONTINUE:
+                            GLOBAL_CHANNEL.offer(ACK);
+                            break;
+                        case REDUCE_RETURN:
+                            return results.values().stream()
+                                    .map(String.class::cast)
+                                    .collect(toList());
+                        default:
+                            throw new IllegalStateException("Unexpected value: " + receivedSignal);
+                    }
+                }
+            } finally {
+                RUNNING_GLOBAL_REDUCE_CNT.decrementAndGet();
+            }
+        }
+    }
+
+    /**
+     * API for the interaction with {@link GlobalInteractiveMapReduceTask}.
+     */
+    public static final class GlobalApi {
+        /**
+         * Checks that {@link GlobalInteractiveMapReduceTask} is alive.
+         */
+        public static void assertAlive() throws InterruptedException {
+            GLOBAL_SIGNALS.offer(Signal.CONTINUE);
+            assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), equalTo(ACK));
+        }
+
+        /**
+         * Finishes the split job, returning job runners for all nodes.
+         */
+        public static void finishSplit() {
+            GLOBAL_SIGNALS.offer(Signal.SPLIT_RETURN_ALL_NODES);
+        }
+
+        /**
+         * Finishes the split job, returning job runners for all nodes.
+         */
+        public static void throwException() {
+            GLOBAL_SIGNALS.offer(Signal.THROW);
+        }
+
+        /**
+         * Finishes the reduce job, returning final result.
+         */
+        public static void finishReduce() {
+            GLOBAL_SIGNALS.offer(Signal.REDUCE_RETURN);
+        }
+
+        public static String name() {
+            return GlobalInteractiveMapReduceTask.class.getName();
+        }
+    }
+}
diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
new file mode 100644
index 0000000..b5ffba5
--- /dev/null
+++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+
+/** Map reduce task which runs a {@link GetNodeNameJob} on each node and computes a sum of length of all node names. */
+public class MapReduce implements MapReduceTask<Integer> {
+    @Override
+    public List<ComputeJobRunner> split(TaskExecutionContext taskContext, Object... args) {
+        List<DeploymentUnit> deploymentUnits = (List<DeploymentUnit>) args[0];
+
+        return taskContext.ignite().clusterNodes().stream().map(node ->
+                ComputeJobRunner.builder()
+                        .jobClassName(GetNodeNameJob.class.getName())
+                        .units(deploymentUnits)
+                        .nodes(Set.of(node))
+                        .options(JobExecutionOptions.builder()
+                                .maxRetries(10)
+                                .priority(Integer.MAX_VALUE)
+                                .build()
+                        ).build()
+        ).collect(toList());
+    }
+
+    @Override
+    public Integer reduce(Map<UUID, ?> results) {
+        return results.values().stream()
+                .map(String.class::cast)
+                .map(String::length)
+                .reduce(Integer::sum)
+                .orElseThrow();
+    }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
index 13c209b..7d46fbe 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackIgniteCompute.java
@@ -28,6 +28,8 @@
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.compute.task.AntiHijackTaskExecution;
 import org.apache.ignite.internal.wrapper.Wrapper;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
@@ -134,6 +136,16 @@
                 .collect(toMap(Entry::getKey, entry -> preventThreadHijack(entry.getValue())));
     }
 
+    @Override
+    public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        return new AntiHijackTaskExecution<>(compute.submitMapReduce(units, taskClassName, args), asyncContinuationExecutor);
+    }
+
+    @Override
+    public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        return compute.executeMapReduce(units, taskClassName, args);
+    }
+
     private <R> JobExecution<R> preventThreadHijack(JobExecution<R> execution) {
         return new AntiHijackJobExecution<>(execution, asyncContinuationExecutor);
     }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
index 5a77f6e..c7c0f99 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/AntiHijackJobExecution.java
@@ -59,7 +59,7 @@
         return preventThreadHijack(execution.changePriorityAsync(newPriority));
     }
 
-    private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) {
+    protected <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> originalFuture) {
         return PublicApiThreading.preventThreadHijack(originalFuture, asyncContinuationExecutor);
     }
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 3f88353..e05883d 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -24,6 +24,8 @@
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -40,7 +42,7 @@
      * @param jobClassName Name of the job class.
      * @param args Job args.
      * @param <R> Job result type.
-     * @return Future execution result.
+     * @return Job execution object.
      */
     <R> JobExecution<R> executeLocally(
             ExecutionOptions options,
@@ -56,7 +58,7 @@
      * @param jobClassName Name of the job class.
      * @param args Job args.
      * @param <R> Job result type.
-     * @return Future execution result.
+     * @return Job execution object.
      */
     default <R> JobExecution<R> executeLocally(
             List<DeploymentUnit> units,
@@ -70,12 +72,12 @@
      * Executes a job of the given class on a remote node.
      *
      * @param options Job execution options.
-     * @param remoteNode Name of the job class.
+     * @param remoteNode Remote node name.
      * @param units Deployment units which will be loaded for execution.
      * @param jobClassName Name of the job class.
      * @param args Job args.
      * @param <R> Job result type.
-     * @return Future execution result.
+     * @return Job execution object.
      */
     <R> JobExecution<R> executeRemotely(
             ExecutionOptions options,
@@ -88,12 +90,12 @@
     /**
      * Executes a job of the given class on a remote node with default execution options {@link ExecutionOptions#DEFAULT}.
      *
-     * @param remoteNode Name of the job class.
+     * @param remoteNode Remote node name.
      * @param units Deployment units which will be loaded for execution.
      * @param jobClassName Name of the job class.
      * @param args Job args.
      * @param <R> Job result type.
-     * @return Future execution result.
+     * @return Job execution object.
      */
     default <R> JobExecution<R> executeRemotely(
             ClusterNode remoteNode,
@@ -108,14 +110,14 @@
      * Executes a job of the given class on a remote node. If the node leaves the cluster, it will be restarted on the node given by the
      * {@code nextWorkerSelector}.
      *
-     * @param remoteNode Name of the job class.
+     * @param remoteNode Remote node name.
      * @param nextWorkerSelector The selector that returns the next worker to execute job on.
      * @param options Job execution options.
      * @param units Deployment units which will be loaded for execution.
      * @param jobClassName Name of the job class.
      * @param args Job args.
      * @param <R> Job result type.
-     * @return Future execution result.
+     * @return Job execution object.
      */
     <R> JobExecution<R> executeRemotelyWithFailover(
             ClusterNode remoteNode,
@@ -127,6 +129,23 @@
     );
 
     /**
+     * Executes a task of the given class.
+     *
+     * @param jobSubmitter Function which submits a job with specified parameters for the execution.
+     * @param units Deployment units which will be loaded for execution.
+     * @param taskClassName Name of the task class.
+     * @param args Task args.
+     * @param <R> Task result type.
+     * @return Task execution object.
+     */
+    <R> TaskExecution<R> executeTask(
+            JobSubmitter jobSubmitter,
+            List<DeploymentUnit> units,
+            String taskClassName,
+            Object... args
+    );
+
+    /**
      * Retrieves the current status of all jobs on all nodes in the cluster.
      *
      * @return The collection of job statuses.
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 677b492..78fbaf6 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -20,6 +20,8 @@
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.mapClassLoaderExceptions;
+import static org.apache.ignite.internal.compute.ComputeUtils.jobClass;
+import static org.apache.ignite.internal.compute.ComputeUtils.taskClass;
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
@@ -34,6 +36,7 @@
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.executor.ComputeExecutor;
@@ -42,6 +45,9 @@
 import org.apache.ignite.internal.compute.loader.JobContextManager;
 import org.apache.ignite.internal.compute.messaging.ComputeMessaging;
 import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
+import org.apache.ignite.internal.compute.task.DelegatingTaskExecution;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
 import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -125,7 +131,7 @@
             CompletableFuture<JobExecutionInternal<R>> future =
                     mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units), jobClassName)
                             .thenApply(context -> {
-                                JobExecutionInternal<R> execution = exec(context, options, jobClassName, args);
+                                JobExecutionInternal<R> execution = execJob(context, options, jobClassName, args);
                                 execution.resultAsync().whenComplete((result, e) -> context.close());
                                 inFlightFutures.registerFuture(execution.resultAsync());
                                 return execution;
@@ -141,6 +147,39 @@
         }
     }
 
+    @Override
+    public <R> TaskExecution<R> executeTask(
+            JobSubmitter jobSubmitter,
+            List<DeploymentUnit> units,
+            String taskClassName,
+            Object... args
+    ) {
+        if (!busyLock.enterBusy()) {
+            return new DelegatingTaskExecution<>(
+                    failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()))
+            );
+        }
+
+        try {
+            CompletableFuture<TaskExecutionInternal<R>> taskFuture =
+                    mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units), taskClassName)
+                            .thenApply(context -> {
+                                TaskExecutionInternal<R> execution = execTask(context, jobSubmitter, taskClassName, args);
+                                execution.resultAsync().whenComplete((r, e) -> context.close());
+                                inFlightFutures.registerFuture(execution.resultAsync());
+                                return execution;
+                            });
+
+            inFlightFutures.registerFuture(taskFuture);
+
+            DelegatingTaskExecution<R> result = new DelegatingTaskExecution<>(taskFuture);
+            result.idAsync().thenAccept(jobId -> executionManager.addExecution(jobId, result));
+            return result;
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public <R> JobExecution<R> executeRemotely(
@@ -253,10 +292,24 @@
         return nullCompletedFuture();
     }
 
-    private <R> JobExecutionInternal<R> exec(JobContext context, ExecutionOptions options, String jobClassName, Object[] args) {
+    private <R> JobExecutionInternal<R> execJob(JobContext context, ExecutionOptions options, String jobClassName, Object... args) {
         try {
-            return executor.executeJob(options, ComputeUtils.jobClass(context.classLoader(), jobClassName), args);
-        } catch (RuntimeException e) {
+            return executor.executeJob(options, jobClass(context.classLoader(), jobClassName), args);
+        } catch (Throwable e) {
+            context.close();
+            throw e;
+        }
+    }
+
+    private <R> TaskExecutionInternal<R> execTask(
+            JobContext context,
+            JobSubmitter jobSubmitter,
+            String taskClassName,
+            Object... args
+    ) {
+        try {
+            return executor.executeTask(jobSubmitter, taskClass(context.classLoader(), taskClassName), args);
+        } catch (Throwable e) {
             context.close();
             throw e;
         }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 28e48a6..b778308 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -34,6 +34,7 @@
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.compute.message.DeploymentUnitMsg;
 import org.apache.ignite.internal.compute.message.ExecuteResponse;
@@ -53,7 +54,7 @@
     private static final ComputeMessagesFactory MESSAGES_FACTORY = new ComputeMessagesFactory();
 
     /**
-     * Instantiate compute job via provided class loader by provided job class name.
+     * Instantiate compute job via provided class loader by provided job class.
      *
      * @param computeJobClass Compute job class.
      * @param <R> Compute job return type.
@@ -76,11 +77,7 @@
 
             return constructor.newInstance();
         } catch (ReflectiveOperationException e) {
-            throw new ComputeException(
-                    CLASS_INITIALIZATION_ERR,
-                    "Cannot instantiate job",
-                    e
-            );
+            throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot instantiate job", e);
         }
     }
 
@@ -96,12 +93,52 @@
         try {
             return (Class<ComputeJob<R>>) Class.forName(jobClassName, true, jobClassLoader);
         } catch (ClassNotFoundException e) {
+            throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot load job class by name '" + jobClassName + "'", e);
+        }
+    }
+
+    /**
+     * Instantiate map reduce task via provided class loader by provided task class.
+     *
+     * @param taskClass Map reduce task class.
+     * @param <R> Map reduce task return type.
+     * @return Map reduce task instance.
+     */
+    public static <R> MapReduceTask<R> instantiateTask(Class<? extends MapReduceTask<R>> taskClass) {
+        if (!(MapReduceTask.class.isAssignableFrom(taskClass))) {
             throw new ComputeException(
                     CLASS_INITIALIZATION_ERR,
-                    "Cannot load job class by name '" + jobClassName + "'",
-                    e
+                    "'" + taskClass.getName() + "' does not implement ComputeTask interface"
             );
         }
+
+        try {
+            Constructor<? extends MapReduceTask<R>> constructor = taskClass.getDeclaredConstructor();
+
+            if (!constructor.canAccess(null)) {
+                constructor.setAccessible(true);
+            }
+
+            return constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot instantiate task", e);
+        }
+    }
+
+    /**
+     * Resolve map reduce task class name to map reduce task class reference.
+     *
+     * @param taskClassLoader Class loader.
+     * @param taskClassName Map reduce task class name.
+     * @param <R> Map reduce task return type.
+     * @return Map reduce task class.
+     */
+    public static <R> Class<MapReduceTask<R>> taskClass(ClassLoader taskClassLoader, String taskClassName) {
+        try {
+            return (Class<MapReduceTask<R>>) Class.forName(taskClassName, true, taskClassLoader);
+        } catch (ClassNotFoundException e) {
+            throw new ComputeException(CLASS_INITIALIZATION_ERR, "Cannot load task class by name '" + taskClassName + "'", e);
+        }
     }
 
     /**
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
index 54191a2..b4f5d15 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.internal.compute;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.toSet;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Compute.RESULT_NOT_FOUND_ERR;
 
-import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -34,6 +34,7 @@
 import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -101,16 +102,16 @@
      *
      * @return The set of all job statuses.
      */
-    public CompletableFuture<Set<JobStatus>> localStatusesAsync() {
-        CompletableFuture<JobStatus>[] statuses = executions.values().stream()
+    public CompletableFuture<List<JobStatus>> localStatusesAsync() {
+        CompletableFuture<JobStatus>[] statusFutures = executions.values().stream()
                 .filter(it -> !(it instanceof RemoteJobExecution) && !(it instanceof FailSafeJobExecution))
                 .map(JobExecution::statusAsync)
                 .toArray(CompletableFuture[]::new);
 
-        return CompletableFuture.allOf(statuses)
-                .thenApply(ignored -> Arrays.stream(statuses).map(CompletableFuture::join)
+        return CompletableFutures.allOf(statusFutures)
+                .thenApply(statuses -> statuses.stream()
                         .filter(Objects::nonNull)
-                        .collect(toSet()));
+                        .collect(toList()));
     }
 
     /**
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index acbf87d..0bd0d64 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -46,6 +46,8 @@
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.compute.NodeNotFoundException;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.ComputeJobRunner;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -92,7 +94,6 @@
         this.clock = clock;
     }
 
-    /** {@inheritDoc} */
     @Override
     public <R> JobExecution<R> submit(
             Set<ClusterNode> nodes,
@@ -148,7 +149,6 @@
                 ));
     }
 
-    /** {@inheritDoc} */
     @Override
     public <R> R execute(
             Set<ClusterNode> nodes,
@@ -157,11 +157,7 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<R>submit(nodes, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
-        }
+        return sync(executeAsync(nodes, units, jobClassName, options, args));
     }
 
     private static ClusterNode randomNode(Set<ClusterNode> nodes) {
@@ -212,7 +208,6 @@
         return targetNode.equals(topologyService.localMember());
     }
 
-    /** {@inheritDoc} */
     @Override
     public <R> JobExecution<R> submitColocated(
             String tableName,
@@ -234,7 +229,6 @@
         );
     }
 
-    /** {@inheritDoc} */
     @Override
     public <K, R> JobExecution<R> submitColocated(
             String tableName,
@@ -263,7 +257,6 @@
         );
     }
 
-    /** {@inheritDoc} */
     @Override
     public <R> R executeColocated(
             String tableName,
@@ -273,14 +266,9 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<R>submitColocated(tableName, key, units, jobClassName, options, args).resultAsync().join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
-        }
+        return sync(executeColocatedAsync(tableName, key, units, jobClassName, options, args));
     }
 
-    /** {@inheritDoc} */
     @Override
     public <K, R> R executeColocated(
             String tableName,
@@ -291,15 +279,9 @@
             JobExecutionOptions options,
             Object... args
     ) {
-        try {
-            return this.<K, R>submitColocated(tableName, key, keyMapper, units, jobClassName, options, args).resultAsync()
-                    .join();
-        } catch (CompletionException e) {
-            throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
-        }
+        return sync(executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args));
     }
 
-    /** {@inheritDoc} */
     @Override
     public <R> CompletableFuture<JobExecution<R>> submitColocatedInternal(
             TableViewInternal table,
@@ -353,7 +335,6 @@
                 });
     }
 
-    /** {@inheritDoc} */
     @Override
     public <R> Map<ClusterNode, JobExecution<R>> submitBroadcast(
             Set<ClusterNode> nodes,
@@ -381,6 +362,23 @@
     }
 
     @Override
+    public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        Objects.requireNonNull(units);
+        Objects.requireNonNull(taskClassName);
+
+        return new TaskExecutionWrapper<>(computeComponent.executeTask(this::submitJob, units, taskClassName, args));
+    }
+
+    @Override
+    public <R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args) {
+        return sync(executeMapReduceAsync(units, taskClassName, args));
+    }
+
+    private JobExecution<Object> submitJob(ComputeJobRunner runner) {
+        return submit(runner.nodes(), runner.units(), runner.jobClassName(), runner.options(), runner.args());
+    }
+
+    @Override
     public CompletableFuture<Collection<JobStatus>> statusesAsync() {
         return computeComponent.statusesAsync();
     }
@@ -404,4 +402,12 @@
     ComputeComponent computeComponent() {
         return computeComponent;
     }
+
+    private static <R> R sync(CompletableFuture<R> future) {
+        try {
+            return future.join();
+        } catch (CompletionException e) {
+            throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
+        }
+    }
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
new file mode 100644
index 0000000..56e2803
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/TaskExecutionWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wraps the {@link TaskExecution} converting exceptions thrown by the delegate to public.
+ *
+ * @param <R> Result type.
+ */
+class TaskExecutionWrapper<R> extends JobExecutionWrapper<R> implements TaskExecution<R> {
+    private final TaskExecution<R> delegate;
+
+    TaskExecutionWrapper(TaskExecution<R> delegate) {
+        super(delegate);
+        this.delegate = delegate;
+    }
+
+    @Override
+    public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+        return convertToPublicFuture(delegate.statusesAsync());
+    }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
index bf4866a..f103086 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
@@ -18,13 +18,18 @@
 package org.apache.ignite.internal.compute.executor;
 
 import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.internal.compute.ExecutionOptions;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
 
 /**
  * Executor of Compute jobs.
  */
 public interface ComputeExecutor {
-    <R> JobExecutionInternal<R> executeJob(ExecutionOptions options, Class<? extends ComputeJob<R>> jobClass, Object[] args);
+    <R> JobExecutionInternal<R> executeJob(ExecutionOptions options, Class<? extends ComputeJob<R>> jobClass, Object... args);
+
+    <R> TaskExecutionInternal<R> executeTask(JobSubmitter jobSubmitter, Class<? extends MapReduceTask<R>> taskClass, Object... args);
 
     void start();
 
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index af39e61..3d956cf 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -24,6 +24,7 @@
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.internal.compute.ComputeUtils;
 import org.apache.ignite.internal.compute.ExecutionOptions;
 import org.apache.ignite.internal.compute.JobExecutionContextImpl;
@@ -31,6 +32,8 @@
 import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
 import org.apache.ignite.internal.compute.queue.QueueExecution;
 import org.apache.ignite.internal.compute.state.ComputeStateMachine;
+import org.apache.ignite.internal.compute.task.JobSubmitter;
+import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
@@ -87,6 +90,17 @@
     }
 
     @Override
+    public <R> TaskExecutionInternal<R> executeTask(
+            JobSubmitter jobSubmitter,
+            Class<? extends MapReduceTask<R>> taskClass,
+            Object... args
+    ) {
+        assert executorService != null;
+
+        return new TaskExecutionInternal<>(executorService, jobSubmitter, taskClass, () -> ignite, args);
+    }
+
+    @Override
     public void start() {
         stateMachine.start();
         executorService = new PriorityQueueExecutor(
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index b6bcd0e..1b39f2b 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -483,25 +483,17 @@
         return result;
     }
 
-    private <R> CompletableFuture<Collection<R>> broadcastAsyncAndCollect(
+    private <R> CompletableFuture<List<R>> broadcastAsyncAndCollect(
             Function<ClusterNode, CompletableFuture<@Nullable R>> request,
-            Function<Throwable, Throwable> error
+            Function<Throwable, RuntimeException> error
     ) {
-        CompletableFuture<Collection<R>> result = new CompletableFuture<>();
-
         CompletableFuture<R>[] futures = topologyService.allMembers()
                 .stream()
                 .map(request::apply)
                 .toArray(CompletableFuture[]::new);
 
-        CompletableFutures.allOf(futures).whenComplete((collection, throwable) -> {
-            if (throwable == null) {
-                result.complete(collection);
-            } else {
-                result.completeExceptionally(error.apply(throwable));
-            }
+        return CompletableFutures.allOf(futures).exceptionally(throwable -> {
+            throw error.apply(throwable);
         });
-
-        return result;
     }
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
new file mode 100644
index 0000000..5adf264
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/AntiHijackTaskExecution.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.compute.AntiHijackJobExecution;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link TaskExecution} that adds protection against thread hijacking by users.
+ */
+public class AntiHijackTaskExecution<R> extends AntiHijackJobExecution<R> implements TaskExecution<R> {
+    private final TaskExecution<R> execution;
+
+    /**
+     * Constructor.
+     *
+     * @param execution Original execution.
+     * @param asyncContinuationExecutor Executor to which the execution will be resubmitted.
+     */
+    public AntiHijackTaskExecution(TaskExecution<R> execution, Executor asyncContinuationExecutor) {
+        super(execution, asyncContinuationExecutor);
+        this.execution = execution;
+    }
+
+    @Override
+    public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+        return preventThreadHijack(execution.statusesAsync());
+    }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
new file mode 100644
index 0000000..3da429d
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/DelegatingTaskExecution.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Delegates {@link TaskExecution} to the future of {@link TaskExecutionInternal}.
+ *
+ * @param <R> Result type.
+ */
+public class DelegatingTaskExecution<R> implements TaskExecution<R> {
+    private final CompletableFuture<TaskExecutionInternal<R>> delegate;
+
+    public DelegatingTaskExecution(CompletableFuture<TaskExecutionInternal<R>> delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        return delegate.thenCompose(TaskExecutionInternal::resultAsync);
+    }
+
+    @Override
+    public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+        return delegate.thenCompose(TaskExecutionInternal::statusesAsync);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        return delegate.thenCompose(TaskExecutionInternal::cancelAsync);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable JobStatus> statusAsync() {
+        return delegate.thenCompose(TaskExecutionInternal::statusAsync);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+        return delegate.thenCompose(execution -> execution.changePriorityAsync(newPriority));
+    }
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
new file mode 100644
index 0000000..400c413
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+
+/**
+ * Compute job submitter.
+ */
+@FunctionalInterface
+public interface JobSubmitter {
+    /**
+     * Submits compute job for an execution.
+     *
+     * @param computeJobRunner Computer job start parameters.
+     */
+    JobExecution<Object> submit(ComputeJobRunner computeJobRunner);
+}
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
new file mode 100644
index 0000000..0b842d1
--- /dev/null
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.task;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
+import static org.apache.ignite.internal.util.ArrayUtils.concat;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
+import org.apache.ignite.internal.compute.queue.QueueExecution;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Internal map reduce task execution object. Runs the {@link MapReduceTask#split(TaskExecutionContext, Object...)} method of the task as a
+ * compute job, then submits the resulting list of jobs. Waits for completion of all compute jobs, then submits the
+ * {@link MapReduceTask#reduce(Map)} method as a compute job. The result of the task is the result of the split method.
+ *
+ * @param <R> Task result type.
+ */
+@SuppressWarnings("unchecked")
+public class TaskExecutionInternal<R> implements JobExecution<R> {
+    private static final IgniteLogger LOG = Loggers.forClass(TaskExecutionInternal.class);
+
+    private final QueueExecution<SplitResult<R>> splitExecution;
+
+    private final CompletableFuture<List<JobExecution<Object>>> executionsFuture;
+
+    private final CompletableFuture<Map<UUID, Object>> resultsFuture;
+
+    private final CompletableFuture<QueueExecution<R>> reduceExecutionFuture;
+
+    private final AtomicReference<JobStatus> reduceFailedStatus = new AtomicReference<>();
+
+    /**
+     * Construct an execution object and starts executing.
+     *
+     * @param executorService Compute jobs executor.
+     * @param jobSubmitter Compute jobs submitter.
+     * @param taskClass Map reduce task class.
+     * @param context Task execution context.
+     * @param args Task arguments.
+     */
+    public TaskExecutionInternal(
+            PriorityQueueExecutor executorService,
+            JobSubmitter jobSubmitter,
+            Class<? extends MapReduceTask<R>> taskClass,
+            TaskExecutionContext context,
+            Object... args
+    ) {
+        LOG.debug("Executing task {}", taskClass.getName());
+        splitExecution = executorService.submit(
+                () -> {
+                    MapReduceTask<R> task = instantiateTask(taskClass);
+                    return new SplitResult<>(task, task.split(context, args));
+                },
+                Integer.MAX_VALUE,
+                0
+        );
+
+        executionsFuture = splitExecution.resultAsync().thenApply(splitResult -> {
+            List<ComputeJobRunner> runners = splitResult.runners();
+            LOG.debug("Submitting {} jobs for {}", runners.size(), taskClass.getName());
+            return submit(runners, jobSubmitter);
+        });
+
+        resultsFuture = executionsFuture.thenCompose(TaskExecutionInternal::resultsAsync);
+
+        reduceExecutionFuture = resultsFuture.thenApply(results -> {
+            LOG.debug("Running reduce job for {}", taskClass.getName());
+
+            // This future is already finished
+            MapReduceTask<R> task = splitExecution.resultAsync().thenApply(SplitResult::task).join();
+
+            return executorService.submit(
+                    () -> task.reduce(results),
+                    Integer.MAX_VALUE,
+                    0
+            );
+        }).whenComplete(this::captureReduceFailure);
+    }
+
+    private void captureReduceFailure(QueueExecution<R> reduceExecution, Throwable throwable) {
+        if (throwable != null) {
+            // Capture the reduce execution failure reason and time.
+            JobState state = throwable instanceof CancellationException ? CANCELED : FAILED;
+            reduceFailedStatus.set(
+                    splitExecution.status().toBuilder()
+                            .state(state)
+                            .finishTime(Instant.now())
+                            .build()
+            );
+        }
+    }
+
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        return reduceExecutionFuture.thenCompose(QueueExecution::resultAsync);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable JobStatus> statusAsync() {
+        JobStatus splitStatus = splitExecution.status();
+        if (splitStatus == null) {
+            // Return null even if the reduce execution can still be retained.
+            return nullCompletedFuture();
+        }
+
+        if (splitStatus.state() != COMPLETED) {
+            return completedFuture(splitStatus);
+        }
+
+        // This future is complete when reduce execution job is submitted, return status from it.
+        if (reduceExecutionFuture.isDone()) {
+            return reduceExecutionFuture.handle((reduceExecution, throwable) -> {
+                if (throwable == null) {
+                    JobStatus reduceStatus = reduceExecution.status();
+                    if (reduceStatus == null) {
+                        return null;
+                    }
+                    return reduceStatus.toBuilder()
+                            .id(splitStatus.id())
+                            .createTime(splitStatus.createTime())
+                            .startTime(splitStatus.startTime())
+                            .build();
+                }
+                return reduceFailedStatus.get();
+            });
+        }
+
+        // At this point split is complete but reduce job is not submitted yet.
+        return completedFuture(splitStatus.toBuilder()
+                .state(EXECUTING)
+                .finishTime(null)
+                .build());
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        // If the split job is not complete, this will cancel the executions future.
+        splitExecution.cancel();
+
+        // This means we didn't submit any jobs yet.
+        if (executionsFuture.cancel(true)) {
+            return trueCompletedFuture();
+        }
+
+        // Split job was complete, results future was running, but not complete yet.
+        if (resultsFuture.cancel(true)) {
+            return executionsFuture.thenCompose(executions -> {
+                CompletableFuture<Boolean>[] cancelFutures = executions.stream()
+                        .map(JobExecution::cancelAsync)
+                        .toArray(CompletableFuture[]::new);
+
+                return allOf(cancelFutures);
+            }).thenApply(unused -> true);
+        }
+
+        // Results arrived but reduce is not yet submitted
+        if (reduceExecutionFuture.cancel(true)) {
+            return trueCompletedFuture();
+        }
+
+        return reduceExecutionFuture.thenApply(QueueExecution::cancel);
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int newPriority) {
+        // If the split job is not started
+        if (splitExecution.changePriority(newPriority)) {
+            return trueCompletedFuture();
+        }
+
+        // This future is complete when reduce execution job is submitted, try to change its priority.
+        if (reduceExecutionFuture.isDone()) {
+            return reduceExecutionFuture.thenApply(reduceExecution -> reduceExecution.changePriority(newPriority));
+        }
+
+        return executionsFuture.thenCompose(executions -> {
+            CompletableFuture<Boolean>[] changePriorityFutures = executions.stream()
+                    .map(execution -> execution.changePriorityAsync(newPriority))
+                    .toArray(CompletableFuture[]::new);
+
+            return allOf(changePriorityFutures).thenApply(unused -> {
+                List<@Nullable Boolean> results = Arrays.stream(changePriorityFutures).map(CompletableFuture::join).collect(toList());
+                if (results.stream().allMatch(b -> b == Boolean.TRUE)) {
+                    return true;
+                }
+                if (results.stream().anyMatch(Objects::isNull)) {
+                    //noinspection RedundantCast this cast is to satisfy spotbugs
+                    return (Boolean) null;
+                }
+                return false;
+            });
+        });
+    }
+
+    CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+        return executionsFuture.thenCompose(executions -> {
+            CompletableFuture<JobStatus>[] statusFutures = executions.stream()
+                    .map(JobExecution::statusAsync)
+                    .toArray(CompletableFuture[]::new);
+
+            return CompletableFutures.allOf(statusFutures);
+        });
+
+    }
+
+    private static CompletableFuture<Map<UUID, Object>> resultsAsync(List<JobExecution<Object>> executions) {
+        CompletableFuture<?>[] resultFutures = executions.stream()
+                .map(JobExecution::resultAsync)
+                .toArray(CompletableFuture[]::new);
+
+        CompletableFuture<UUID>[] idFutures = executions.stream()
+                .map(JobExecution::idAsync)
+                .toArray(CompletableFuture[]::new);
+
+        return allOf(concat(resultFutures, idFutures)).thenApply(unused -> {
+            Map<UUID, Object> results = new HashMap<>();
+
+            for (int i = 0; i < resultFutures.length; i++) {
+                results.put(idFutures[i].join(), resultFutures[i].join());
+            }
+
+            return results;
+        });
+    }
+
+    private static <R> List<JobExecution<Object>> submit(List<ComputeJobRunner> runners, JobSubmitter jobSubmitter) {
+        return runners.stream()
+                .map(jobSubmitter::submit)
+                .collect(toList());
+    }
+
+    private static class SplitResult<R> {
+        private final MapReduceTask<R> task;
+
+        private final List<ComputeJobRunner> runners;
+
+        private SplitResult(MapReduceTask<R> task, List<ComputeJobRunner> runners) {
+            this.task = task;
+            this.runners = runners;
+        }
+
+        private List<ComputeJobRunner> runners() {
+            return runners;
+        }
+
+        private MapReduceTask<R> task() {
+            return task;
+        }
+    }
+}