blob: 9fe272dd62b320bb8ca48315bfa3538c64c0e47d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.snippets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
public class DistributedComputing {
void getCompute() {
// tag::get-compute[]
Ignite ignite = Ignition.start();
IgniteCompute compute = ignite.compute();
// end::get-compute[]
}
void getComputeForNodes() {
// tag::get-compute-for-nodes[]
Ignite ignite = Ignition.start();
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());
// end::get-compute-for-nodes[]
}
void taskTimeout() {
Ignite ignite = Ignition.start();
// tag::timeout[]
IgniteCompute compute = ignite.compute();
compute.withTimeout(300_000).run(() -> {
// your computation
// ...
});
// end::timeout[]
}
void executeRunnable(Ignite ignite) {
// tag::execute-runnable[]
IgniteCompute compute = ignite.compute();
// Iterate through all words and print
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" ")) {
compute.run(() -> System.out.println(word));
}
// end::execute-runnable[]
}
void executeCallable(Ignite ignite) {
// tag::execute-callable[]
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
// Iterate through all words in the sentence and create callable jobs.
for (String word : "How many characters".split(" "))
calls.add(word::length);
// Execute the collection of callables on the cluster.
Collection<Integer> res = ignite.compute().call(calls);
// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();
// end::execute-callable[]
}
void executeIgniteClosure(Ignite ignite) {
// tag::execute-closure[]
IgniteCompute compute = ignite.compute();
// Execute closure on all cluster nodes.
Collection<Integer> res = compute.apply(String::length, Arrays.asList("How many characters".split(" ")));
// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();
// end::execute-closure[]
}
void broadcast(Ignite ignite) {
// tag::broadcast[]
// Limit broadcast to remote nodes only.
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());
// Print out hello message on remote nodes in the cluster group.
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));
// end::broadcast[]
}
void async(Ignite ignite) {
// tag::async[]
IgniteCompute compute = ignite.compute();
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
// Iterate through all words in the sentence and create callable jobs.
for (String word : "Count characters using a callable".split(" "))
calls.add(word::length);
IgniteFuture<Collection<Integer>> future = compute.callAsync(calls);
future.listen(fut -> {
// Total number of characters.
int total = fut.get().stream().mapToInt(Integer::intValue).sum();
System.out.println("Total number of characters: " + total);
});
// end::async[]
}
void shareState(Ignite ignite) {
// tag::get-map[]
IgniteCluster cluster = ignite.cluster();
ConcurrentMap<String, Integer> nodeLocalMap = cluster.nodeLocalMap();
// end::get-map[]
// tag::job-counter[]
IgniteCallable<Long> job = new IgniteCallable<Long>() {
@IgniteInstanceResource
private Ignite ignite;
@Override
public Long call() {
// Get a reference to node local.
ConcurrentMap<String, AtomicLong> nodeLocalMap = ignite.cluster().nodeLocalMap();
AtomicLong cntr = nodeLocalMap.get("counter");
if (cntr == null) {
AtomicLong old = nodeLocalMap.putIfAbsent("counter", cntr = new AtomicLong());
if (old != null)
cntr = old;
}
return cntr.incrementAndGet();
}
};
// end::job-counter[]
}
// tag::access-data[]
public class MyCallableTask implements IgniteCallable<Integer> {
@IgniteInstanceResource
private Ignite ignite;
@Override
public Integer call() throws Exception {
IgniteCache<Long, Person> cache = ignite.cache("person");
// Get the data you need
Person person = cache.get(1L);
// do with the data what you need to do
return 1;
}
}
// end::access-data[]
}