blob: 47bd72ff49f1dd390db1cd2600f79027c5142699 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
:javaSourceFile: {javaCodeDir}/CollocatedComputations.java
:dotnetSourceFile: code-snippets/dotnet/CollocationgComputationsWithData.cs
= Colocating Computations with Data
Colocated computation is type of distributed data processing wherein the computational task you want to perform over a specific data set is sent to the nodes where the required data is located and only the results of the computations are sent back. This approach minimizes data transfer between nodes and can significantly reduce the task execution time.
Ignite provides several ways to perform colocated computations, all of which use the affinity function to determine the location of the data.
The compute interface provides the `affinityCall(...)` and `affinityRun(...)` methods that colocate a task with data either by key or by partition.
[IMPORTANT]
====
The `affinityCall(...)` and `affinityRun(...)` methods guarantee that the data for the given key or partition is present on the target node for the duration of the task.
====
[IMPORTANT]
====
The class definitions of the task to be executed on remote nodes must be available on the nodes.
You can ensure this in two ways:
* Add the classes to the classpath of the nodes;
* Enable link:code-deployment/peer-class-loading[peer class loading].
====
== Colocating by Key
To send a computational task to the node where a given key is located, use the following methods:
- `IgniteCompute.affinityCall(String cacheName, Object key, IgniteCallable<R> job)`
- `IgniteCompute.affinityRun(String cacheName, Object key, IgniteRunnable job)`
Ignite calls the configured affinity function to determine the location of the given key.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaSourceFile}[tag=collocating-by-key,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::{dotnetSourceFile}[tag=affinityRun,indent=0]
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/affinity_run.cpp[tag=affinity-run,indent=0]
----
--
== Colocating by Partition
The `affinityCall(Collection<String> cacheNames, int partId, IgniteRunnable job)` and `affinityRun(Collection<String> cacheNames, int partId, IgniteRunnable job)` send a given task to the node where the partition with a given ID is located. This is useful when you need to retrieve objects for multiple keys and you know that the keys belong to the same partition. In this case, you can create one task instead of multiple task for each key.
For example, let's say you want to calculate the arithmetic mean of a specific field for a specific subset of keys.
If you want to distribute the computation, you can group the keys by partitions and send each group of keys to the node where the partition is located to get the values.
The number of groups and, therefore, the number of tasks is no more than the total number of partitions (default is 1024).
Below is a code snippet that illustrates this example.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaSourceFile}[tag=calculate-average,indent=0]
----
tab:C#/.NET[unsupported]
affinityCall(..) method with partition id as parameter is unsupported in Ignite .NET
tab:C++[unsupported]
--
If you want to process all the data in the cache, you can iterate over all cache partitions and send tasks that process the data stored on each individual partition.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaSourceFile}[tag=sum-by-partition,indent=0]
----
tab:C#/.NET[unsupported]
tab:C++[unsupported]
--
[IMPORTANT]
====
[discrete]
=== Performance Considerations
Colocated computations yield performance benefits when the amount of the data you want to process is sufficiently large. In some cases, when the amount of data is small, a link:key-value-api/using-scan-queries[scan query] may perform better.
====
== Entry Processor
// This section should probably be expanded with more details and examples
An entry processor is used to process cache entries on the nodes where they are stored and return the result of the processing. With an entry processor, you do not have to transfer the entire object to perform an operation with it, you can perform the operation remotely and only transfer the results.
If an entry processor sets the value for an entry that does not exist, the entry is added to the cache.
Entry processors are executed atomically within a lock on the given cache key.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaSourceFile}[tag=entry-processor,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
void CacheInvoke()
{
var ignite = Ignition.Start();
var cache = ignite.GetOrCreateCache<int, int>("myCache");
var proc = new Processor();
// Increment cache value 10 times
for (int i = 0; i < 10; i++)
cache.Invoke(1, proc, 5);
}
class Processor : ICacheEntryProcessor<int, int, int, int>
{
public int Process(IMutableCacheEntry<int, int> entry, int arg)
{
entry.Value = entry.Exists ? arg : entry.Value + arg;
return entry.Value;
}
}
----
tab:C++[]
[source,cpp]
----
include::code-snippets/cpp/src/invoke.cpp[tag=invoke,indent=0]
----
--
////
TODO: the importance of this section is questionable
== Cache Interceptor
Ignite lets you execute custom logic before or after specific operations on a cache. You can:
- change the returned value of the `get` operation;
- process an entry before or after any `put`/`remove` operation.
////