blob: 57f2710998961f04341dd392e896cc425f4e118a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.examples.spark;
import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.spark.JavaIgniteContext;
import org.apache.ignite.spark.JavaIgniteRDD;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import scala.Tuple2;
/**
* This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this
* particular example is to provide the simplest code example of this logic.
* <p>
* This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node.
* <p>
* The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's
* {@code standalone} property to {@code true} and running an Ignite node separately with
* `examples/config/spark/example-shared-rdd.xml` config.
*/
public class SharedRDDExample {
/**
* Executes the example.
* @param args Command line arguments, none required.
*/
public static void main(String args[]) {
// Spark Configuration.
SparkConf sparkConf = new SparkConf()
.setAppName("JavaIgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2");
// Spark context.
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
// Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>(
sparkContext, "config/spark/example-shared-rdd.xml", false);
// Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD");
// Define data to be stored in the Ignite RDD (cache).
List<Integer> data = new ArrayList<>(20);
for (int i = 0; i < 20; i++) {
data.add(i);
}
// Preparing a Java RDD.
JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);
// Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2.
sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
return new Tuple2<Integer, Integer>(val, val);
}
}));
System.out.println(">>> Iterating over Ignite Shared RDD...");
// Iterate over the Ignite RDD.
sharedRDD.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override public void call(Tuple2<Integer, Integer> tuple) throws Exception {
System.out.println("(" + tuple._1 + "," + tuple._2 + ")");
}
});
System.out.println(">>> Transforming values stored in Ignite Shared RDD...");
// Filter out even values as a transformed RDD.
JavaPairRDD<Integer, Integer> transformedValues =
sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
@Override public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception {
return tuple._2() % 2 == 0;
}
});
// Print out the transformed values.
transformedValues.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override public void call(Tuple2<Integer, Integer> tuple) throws Exception {
System.out.println("(" + tuple._1 + "," + tuple._2 + ")");
}
});
System.out.println(">>> Executing SQL query over Ignite Shared RDD...");
// Execute SQL query over the Ignite RDD.
Dataset df = sharedRDD.sql("select _val from Integer where _key < 9");
// Show the result of the execution.
df.show();
// Close IgniteContext on all the workers.
igniteContext.close(true);
}
}