blob: c657e02d2c942b3543e51641343dc8c4b8626933 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.snippets;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
public class CollocatedComputations {
public static void main(String[] args) {
Ignite ignite = Ignition.start();
HashSet<Long> keys = new HashSet<>();
calculateAverage(ignite, keys);
void collocatingByKey(Ignite ignite) {
// tag::collocating-by-key[]
IgniteCache<Integer, String> cache = ignite.cache("myCache");
IgniteCompute compute = ignite.compute();
int key = 1;
// This closure will execute on the remote node where
// data for the given 'key' is located.
compute.affinityRun("myCache", key, () -> {
// Peek is a local memory lookup.
System.out.println("Co-located [key= " + key + ", value= " + cache.localPeek(key) + ']');
// end::collocating-by-key[]
// tag::calculate-average[]
// this task sums up the values of the salary field for the given set of keys
private static class SumTask implements IgniteCallable<BigDecimal> {
private Set<Long> keys;
public SumTask(Set<Long> keys) {
this.keys = keys;
private Ignite ignite;
public BigDecimal call() throws Exception {
IgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();
BigDecimal sum = new BigDecimal(0);
for (long k : keys) {
BinaryObject person = cache.localPeek(k, CachePeekMode.PRIMARY);
if (person != null)
sum = sum.add(new BigDecimal((float) person.field("salary")));
return sum;
public static void calculateAverage(Ignite ignite, Set<Long> keys) {
// get the affinity function configured for the cache
Affinity<Long> affinityFunc = ignite.affinity("person");
// this map stores collections of keys for each partition
HashMap<Integer, Set<Long>> partMap = new HashMap<>();
keys.forEach(k -> {
int partId = affinityFunc.partition(k);
Set<Long> keysByPartition = partMap.computeIfAbsent(partId, key -> new HashSet<Long>());
BigDecimal total = new BigDecimal(0);
IgniteCompute compute = ignite.compute();
List<String> caches = Arrays.asList("person");
// iterate over all partitions
for (Map.Entry<Integer, Set<Long>> pair : partMap.entrySet()) {
// send a task that gets specific keys for the partition
BigDecimal sum = compute.affinityCall(caches, pair.getKey().intValue(), new SumTask(pair.getValue()));
total = total.add(sum);
System.out.println("the average salary is " + total.floatValue() / keys.size());
// end::calculate-average[]
// tag::sum-by-partition[]
// this task sums up the value of the 'salary' field for all objects stored in
// the given partition
public static class SumByPartitionTask implements IgniteCallable<BigDecimal> {
private int partId;
public SumByPartitionTask(int partId) {
this.partId = partId;
private Ignite ignite;
public BigDecimal call() throws Exception {
// use binary objects to avoid deserialization
IgniteCache<Long, BinaryObject> cache = ignite.cache("person").withKeepBinary();
BigDecimal total = new BigDecimal(0);
try (QueryCursor<Cache.Entry<Long, BinaryObject>> cursor = cache
.query(new ScanQuery<Long, BinaryObject>(partId).setLocal(true))) {
for (Cache.Entry<Long, BinaryObject> entry : cursor) {
total = total.add(new BigDecimal((float) entry.getValue().field("salary")));
return total;
// end::sum-by-partition[]
public static void entryProcessor(Ignite ignite) {
// tag::entry-processor[]
IgniteCache<String, Integer> cache = ignite.cache("mycache");
// Increment the value for a specific key by 1.
// The operation will be performed on the node where the key is stored.
// Note that if the cache does not contain an entry for the given key, it will
// be created.
cache.invoke("mykey", (entry, args) -> {
Integer val = entry.getValue();
entry.setValue(val == null ? 1 : val + 1);
return null;
// end::entry-processor[]