blob: ea2b9e1efb65083487fc1a08539bf157b1272e31 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Distributed Computing
:javaFile: {javaCodeDir}/DistributedComputing.java
Ignite provides an API for distributing computations across cluster nodes in a balanced and fault-tolerant manner. You can submit individual tasks for execution as well as implement the MapReduce pattern with automatic task splitting. The API provides fine-grained control over the link:distributed-computing/load-balancing[job distribution strategy].
////
*TODO: recommendation: define tasks for execution in seprate classes (not as nested clusses or closures), because the enclosing class will be peer-deployed as well and you may not want this*
////
== Getting the Compute Interface
The main entry point for running distributed computations is the compute interface, which can be obtained from an instance of `Ignite`.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=get-compute,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=gettingCompute,indent=0]
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/compute_get.cpp[tag=compute-get,indent=0]
----
--
The compute interface provides methods for distributing different types of tasks over cluster nodes and running link:distributed-computing/collocated-computations[colocated computations].
== Specifying the Set of Nodes for Computations
Each instance of the compute interface is associated with a link:distributed-computing/cluster-groups[set of nodes] on which the tasks are executed.
When called without arguments, `ignite.compute()` returns the compute interface that is associated with all server nodes.
To obtain an instance for a specific subset of nodes, use `Ignite.compute(ClusterGroup group)`.
In the following example, the compute interface is bound to the remote nodes only, i.e. all nodes except for the one that runs this code.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=get-compute-for-nodes,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=forRemotes,indent=0]
----
tab:C++[unsupported]
--
== Executing Tasks
Ignite provides three interfaces that can be implemented to represent a task and executed via the compute interface:
- `IgniteRunnable` — an extension of `java.lang.Runnable` that can be used to implement calculations that do not have input parameters and return no result.
- `IgniteCallable` — an extension of `java.util.concurrent.Callable` that returns a specific value.
- `IgniteClosure` — a functional interface that accepts a parameter and returns a value.
You can execute a task once (on one of the nodes) or broadcast it to all nodes.
[IMPORTANT]
====
In order to run tasks on the remote nodes, make sure the class definitions of the tasks are available on the nodes.
You can do this in two ways:
- Add the classes to the classpath of the nodes;
- Enable link:code-deployment/peer-class-loading[peer class loading].
====
=== Executing a Runnable Task
To execute a runnable task, use the `run(...)` method of the compute interface. The task is sent to one of the nodes associated with the compute instance.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=execute-runnable,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=computeAction,indent=0]
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/compute_run.cpp[tag=compute-run,indent=0]
----
--
=== Executing a Callable Task
To execute a callable task, use the `call(...)` method of the compute interface.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=execute-callable,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=computeFunc,indent=0]
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/compute_call.cpp[tag=compute-call,indent=0]
----
--
=== Executing an IgniteClosure
To execute an `IgniteClosure`, use the `apply(...)` method of the compute interface. The method accepts a task and an input parameter for the task. The parameter is passed to the given `IgniteClosure` at the execution time.
[tabs]
--
tab:Java[]
[source, java]
----
include::{javaFile}[tag=execute-closure,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=computeFuncApply,indent=0]
----
tab:C++[unsupported]
--
=== Broadcasting a Task
The `broadcast()` method executes a task on _all nodes_ associated with the compute instance.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=broadcast,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=broadcast,indent=0]
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/compute_broadcast.cpp[tag=compute-broadcast,indent=0]
----
--
=== Asynchronous Execution
All methods described in the previous sections have asynchronous counterparts:
- `callAsync(...)`
- `runAsync(...)`
- `applyAsync(...)`
- `broadcastAsync(...)`
The asynchronous methods return an `IgniteFuture` that represents the result of the operation. In the following example, a collection of callable tasks is executed asynchronously.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=async,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=async,indent=0]
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/compute_call_async.cpp[tag=compute-call-async,indent=0]
----
--
== Task Execution Timeout
You can set a timeout for task execution.
If the task does not finish within the given time frame, it be stopped and all jobs produced by this task are cancelled.
To execute a task with a timeout, use the `withTimeout(...)` method of the compute interface.
The method returns a compute interface that executes the first task given to it in a time-limited manner.
Consequent tasks do not have a timeout: you need to call `withTimeout(...)` for every task that should have a timeout.
//TODO: code samples for other languages
[tabs]
--
tab:Java[]
[source, java]
----
include::{javaFile}[tags=timeout,indent=0]
----
--
== Sharing State Between Jobs on Local Node
It is often useful to share a state between different compute jobs executed on one node. For this purpose, there is a shared concurrent local map available on each node.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=get-map,indent=0]
----
tab:C#/.NET[unsupported]
tab:C++[unsupported]
--
Node-local values are similar to thread local variables in that these values are not distributed and kept only on the local node.
Node-local data can be used to share the state between compute jobs.
It can also be used by deployed services.
In the following example, a job increments a node-local counter every time it executes on some node. As a result, the node-local counter on each node tells us how many times the job has executed on that node.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=job-counter,indent=0]
----
tab:C#/.NET[unsupported]
tab:C++[unsupported]
--
== Accessing Data from Computational Tasks
If your computational task needs to access the data stored in caches, you can do it via the instance of `Ignite`:
[tabs]
--
tab:Java[]
[source, java]
----
include::{javaFile}[tag=access-data,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DistributedComputingApi.cs[tag=instanceResource,indent=0]
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/compute_acessing_data.cpp[tag=compute-acessing-data,indent=0]
----
--
Note that the example shown above may not be the most effective way.
The reason is that the person object that corresponds to key `1` may be located on a node that is different from the node where the task is executed.
In this case, the object is fetched via network. This can be avoided by link:distributed-computing/collocated-computations[colocating the task with the data].
[CAUTION]
====
If you want to use the key and value objects inside `IgniteCallable` and `IgniteRunnable` tasks, make sure the key and value classes are deployed on all cluster nodes.
====
////////////////////////////////////////////////////////////////////////////////
In the cases where you do not need to colocate computations with data but simply want to process all data remotely, you can run local cache queries inside the `call()` method. Consider the following example.
Let's say we have a cache that stores information about persons and we want to calculate the average age of all persons. One way to accomplish this is to run a link:key-value-api/querying[scan query] that will fetch the ages of all persons to the local node, where you can calculate the average age.
A more efficient way, however, is to avoid network calls to other nodes by running the query locally on each remote node and aggregating the result on the local node.
This task can be easily split
[source, java]
-------------------------------------------------------------------------------
private class AverageAgeJob implements IgniteCallable<Double> {
@IgniteInstanceResource
private Ignite ignite;
@Override
public Double call() throws Exception {
IgniteCache<Long, Person> cache = ignite.cache("person");
int localAvg = 0;
try (QueryCursor<Cache.Entry<Long, Person>> cursor = cache
.query(new ScanQuery<Long, Person>().setLocal(true))) {
for (Cache.Entry<Long, Person> entry : cursor) {
localAvg += (int) entry.getValue().getAge();
}
}
return (localAvg / (double) cache.size());
}
}
-------------------------------------------------------------------------------
Note that the scan query is executed in the local mode. It means that it will only fetch objects from the Person cache that are stored localy and will not request data from other nodes.
If you broadcast this task to all nodes, all person objecs will be processed (each locally), and the results are sent to the node that initiated the task.
[source, java]
-------------------------------------------------------------------------------
Ignite ignite = Ignition.ignite();
double average = ignite.compute().broadcast(new AverageAgeTask()).stream().reduce(0D, (a, b) -> a + b);
-------------------------------------------------------------------------------
The task is executed on every node, where it will query all persons stored locally and calculate the local average. Then the result are sent to the node that initiated the task and summed up. In this implementation, objects are not transferred via network.
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
== Accessing Ignite Resources from Tasks
Ignite provides a number of resources that can be injected into a task, such as an instance of `Ignite`.
`TaskSessionResource` - this
`IgniteInstanceResource` -
`LoggerResource` -
`SpringApplicationContextResource` -
`SpringResource` -
[source, java]
-------------------------------------------------------------------------------
example
-------------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////