blob: cb609ef2b44cec0fbf821ccbd84a464222c33710 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.loadtests.mergesort;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
/**
* A task that performs distributed Merge Sort.
*/
public class GridMergeSortLoadTask extends ComputeTaskSplitAdapter<int[], int[]> {
/** Injected Grid instance. */
@IgniteInstanceResource
private Ignite ignite;
/**
* Receives the array to sort, splits it into 2 arrays, and returns 2
* jobs that perform that task recursively, each for the corresponding part
* of the array. Each recursive task will return a sorted array.
*
* Because this is a recursive algorithm and we cannot hold threads are every
* recursion step, we use the <i>continuation</i> mechanism
* ({@link org.apache.ignite.compute.ComputeJobContext} methods {@code holdcc()} and {@code callcc()})
* to pause the parent tasks while the child tasks are running. Otherwise we may
* run out of threads.
*
* @param gridSize Number of available grid nodes. Note that returned number of
* jobs can be less, equal or greater than this grid size.
* @param initArr Array to sort.
* @return 2 jobs that will run the sort recursively for each part of the array.
*/
@Override protected Collection<ComputeJob> split(int gridSize, int[] initArr) {
Collection<ComputeJob> jobs = new LinkedList<>();
for (final int[] arr : splitArray(initArr)) {
jobs.add(new ComputeJobAdapter() {
// Auto-inject job context.
@JobContextResource
private ComputeJobContext jobCtx;
// Task execution result future.
private ComputeTaskFuture<int[]> fut;
@Override public Object execute() {
if (arr.length == 1)
return arr;
// Future is null before holdcc() is called and
// not null after callcc() is called.
if (fut == null) {
// Launch the recursive child task asynchronously.
fut = ignite.compute().executeAsync(new GridMergeSortLoadTask(), arr);
// Add a listener to the future, that will resume the
// parent task once the child one is completed.
fut.listen(new CI1<IgniteFuture<int[]>>() {
@Override public void apply(IgniteFuture<int[]> fut) {
// CONTINUATION:
// =============
// Resume suspended job execution.
jobCtx.callcc();
}
});
// CONTINUATION:
// =============
// Suspend job execution to be continued later and
// release the executing thread.
return jobCtx.holdcc();
}
else {
assert fut.isDone();
// Return the result of a completed child task.
return fut.get();
}
}
});
}
return jobs;
}
/**
* This method is called when both child jobs are completed, and is a
* Reduce step of Merge Sort algorithm.
*
* On this step we do a merge of 2 sorted arrays, produced by child tasks,
* into a 1 sorted array.
*
* @param results The child task execution results (sorted arrays).
* @return A merge result: single sorted array.
*/
@Override public int[] reduce(List<ComputeJobResult> results) {
if (results.size() == 1) // This is in case we have a single-element array.
return results.get(0).getData();
assert results.size() == 2;
int[] arr1 = results.get(0).getData();
int[] arr2 = results.get(1).getData();
return mergeArrays(arr1, arr2);
}
/**
* Splits the array into two parts.
*
* If array size is odd, then the second part is one element
* greater than the first one. Otherwise, the parts have
* equal size.
*
* @param arr Array to split.
* @return Split result: a collection of 2 arrays.
*/
private Iterable<int[]> splitArray(int[] arr) {
int len1 = arr.length / 2;
int len2 = len1 + arr.length % 2;
int[] a1 = new int[len1];
int[] a2 = new int[len2];
System.arraycopy(arr, 0, a1, 0, len1);
System.arraycopy(arr, len1, a2, 0, len2);
return Arrays.asList(a1, a2);
}
/**
* Performs a merge of 2 arrays. This method runs the element-by-element
* comparison of specified arrays and stacks the least elements into a
* resulting array.
*
* @param arr1 First array.
* @param arr2 Second array.
* @return The merged array, in which any element from the first half is less or equal
* than any element from the second half.
*/
private int[] mergeArrays(int[] arr1, int[] arr2) {
int[] ret = new int[arr1.length + arr2.length];
int i1 = 0;
int i2 = 0;
// Merge 2 arrays into a resulting array
for (int i = 0; i < ret.length; i++) {
if (i1 >= arr1.length) {
System.arraycopy(arr2, i2, ret, i, arr2.length - i2); // Copy the remainder of an array.
break;
}
else if (i2 >= arr2.length) {
System.arraycopy(arr1, i1, ret, i, arr1.length - i1); // Copy the remainder of an array.
break;
}
else
ret[i] = arr1[i1] <= arr2[i2] ? arr1[i1++] : arr2[i2++];
}
return ret;
}
}