This section describes how to access the functionality of Spark GemFire Connector when you write your Spark applications in Java. It is assumed that you already familiarized yourself with the previous sections and understand how the Spark GemFire Connector works.
The best way to use the Spark GemFire Connector Java API is to statically import all of the methods in GemFireJavaUtil
. This utility class is the main entry point for Spark GemFire Connector Java API.
import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*;
Create JavaSparkContext (don't forget about the static import):
SparkConf conf = new SparkConf(); conf.set(GemFireLocatorPropKey, "192.168.1.47[10334]") JavaSparkContext jsc = new JavaSparkContext(conf);
GemFire region is exposed as GemFireJavaRegionRDD<K,V>
(subclass of JavaPairRDD<K, V>
):
GemFireJavaRegionRDD<Int, Emp> rdd1 = javaFunctions(jsc).gemfireRegion("emps") GemFireJavaRegionRDD<Int, Emp> rdd2 = rdd1.where("value.getAge() < 40");
Use the rdd3
and region emps
from join and outer join examples:
static class MyKeyFunction implements Function<Tuple2<String, Integer>, Integer> { @Override public Interger call(Tuple2<String, Integer> pair) throws Exception { return pair._2(); } } MyKeyFunction func = new MyKeyFunction(); JavaPairRDD<Tuple2<String, Integer>, Emp> rdd3j = javaFunction(rdd3).joinGemfireRegion("emps", func); JavaPairRDD<Tuple2<String, Integer>, Option<Emp>> rdd3o = javaFunction(rdd3).outerJoinGemfireRegion("emps", func);
Saving JavaPairRDD is straightforward:
List<Tuple2<String, String>> data = new ArrayList<>(); data.add(new Tuple2<>("7", "seven")); data.add(new Tuple2<>("8", "eight")); data.add(new Tuple2<>("9", "nine")); // create JavaPairRDD JavaPairRDD<String, String> rdd1 = jsc.parallelizePairs(data); // save to GemFire javaFunctions(rdd1).saveToGemfire("str_str_region");
In order to save JavaRDD<Tuple2<K,V>>
, it needs to be converted to JavaPairRDD<K,V>
via static method toJavaPairRDD
from GemFireJavaUtil
:
List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>(); data2.add(new Tuple2<>("11", "eleven")); data2.add(new Tuple2<>("12", "twelve")); data2.add(new Tuple2<>("13", "thirteen")); // create JavaRDD<Tuple2<K,V>> JavaRDD<Tuple2<String, String>> rdd2 = jsc.parallelize(data2); // save to GemFire javaFunctions(toJavaPairRDD(rdd2)).saveToGemfire("str_str_region");
Similar to Scala version, a function is required to generate key/value pair from RDD element. The following PairFunction
generate a <String, Integer>
pair from <String>
:
PairFunction<String, String, Integer> pairFunc = new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, s.length()); } };
Note: there are 3 type parameters for PairFunction, they are:
Once PairFunction
is ready, the rest is easy:
// create demo JavaRDD<String> List<String> data = new ArrayList<String>(); data.add("a"); data.add("ab"); data.add("abc"); JavaRDD<String> jrdd = sc.parallelize(data); javaFunctions(rdd).saveToGemfire("str_int_region", pairFunc);
Saving JavaPairDStream and JavaDStream is similar to saving JavaPairRDD jand JavaRDD:
JavaPairDStream<String, String> ds1 = ... javaFunctions(ds1).saveToGemFire("str_str_region"); JavaDStream<String> ds2 = ... javaFunctions(ds2).saveToGemFire("str_int_region", pairFunc);
There are two gemfireOQL Java APIs, with and without GemFireConnectionConf. Here is an example without GemFireConnectionConf, it will use default GemFireConnectionConf internally.
// assume there's jsc: JavaSparkContext SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc); DataFrame df = javaFunctions(sqlContext).gemfireOQL("select * from /str_str_region"); df.show();
Next: [About The Demos] (10_demos.md)