blob: 6fbc636065934b8383e92032f0276e781680c23b [file] [log] [blame] [view]
## Using Connector in Java
This section describes how to access the functionality of Spark Geode
Connector when you write your Spark applications in Java. It is assumed
that you already familiarized yourself with the previous sections and
understand how the Spark Geode Connector works.
### Prerequisites
The best way to use the Spark Geode Connector Java API is to statically
import all of the methods in `GeodeJavaUtil`. This utility class is
the main entry point for Spark Geode Connector Java API.
```
import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*;
```
Create JavaSparkContext (don't forget about the static import):
```
SparkConf conf = new SparkConf();
conf.set(GeodeLocatorPropKey, "192.168.1.47[10334]")
JavaSparkContext jsc = new JavaSparkContext(conf);
```
### Accessing Geode region in Java
Geode region is exposed as `GeodeJavaRegionRDD<K,V>`(subclass of
`JavaPairRDD<K, V>`):
```
GeodeJavaRegionRDD<Int, Emp> rdd1 = javaFunctions(jsc).geodeRegion("emps")
GeodeJavaRegionRDD<Int, Emp> rdd2 = rdd1.where("value.getAge() < 40");
```
### RDD Join and Outer Join
Use the `rdd3` and region `emps` from [join and outer join examples](5_rdd_join.md):
```
static class MyKeyFunction implements Function<Tuple2<String, Integer>, Integer> {
@Override public Interger call(Tuple2<String, Integer> pair) throws Exception {
return pair._2();
}
}
MyKeyFunction func = new MyKeyFunction();
JavaPairRDD<Tuple2<String, Integer>, Emp> rdd3j =
javaFunction(rdd3).joinGeodeRegion("emps", func);
JavaPairRDD<Tuple2<String, Integer>, Option<Emp>> rdd3o =
javaFunction(rdd3).outerJoinGeodeRegion("emps", func);
```
### Saving JavaPairRDD to Geode
Saving JavaPairRDD is straightforward:
```
List<Tuple2<String, String>> data = new ArrayList<>();
data.add(new Tuple2<>("7", "seven"));
data.add(new Tuple2<>("8", "eight"));
data.add(new Tuple2<>("9", "nine"));
// create JavaPairRDD
JavaPairRDD<String, String> rdd1 = jsc.parallelizePairs(data);
// save to Geode
javaFunctions(rdd1).saveToGeode("str_str_region");
```
In order to save `JavaRDD<Tuple2<K,V>>`, it needs to be converted to
`JavaPairRDD<K,V>` via static method `toJavaPairRDD` from `GeodeJavaUtil`:
```
List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
data2.add(new Tuple2<>("11", "eleven"));
data2.add(new Tuple2<>("12", "twelve"));
data2.add(new Tuple2<>("13", "thirteen"));
// create JavaRDD<Tuple2<K,V>>
JavaRDD<Tuple2<String, String>> rdd2 = jsc.parallelize(data2);
// save to Geode
javaFunctions(toJavaPairRDD(rdd2)).saveToGeode("str_str_region");
```
### Saving JavaRDD to Geode
Similar to Scala version, a function is required to generate key/value pair
from RDD element. The following `PairFunction` generate a `<String, Integer>`
pair from `<String>`:
```
PairFunction<String, String, Integer> pairFunc =
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, s.length());
}
};
```
Note: there are 3 type parameters for PairFunction, they are:
1. type of JavaRDD element
2. type of key of output key/value pair
3. type of value of output key/value pair
Once `PairFunction` is ready, the rest is easy:
```
// create demo JavaRDD<String>
List<String> data = new ArrayList<String>();
data.add("a");
data.add("ab");
data.add("abc");
JavaRDD<String> jrdd = sc.parallelize(data);
javaFunctions(rdd).saveToGeode("str_int_region", pairFunc);
```
### Saving JavaPairDStream and JavaDStream
Saving JavaPairDStream and JavaDStream is similar to saving JavaPairRDD
jand JavaRDD:
```
JavaPairDStream<String, String> ds1 = ...
javaFunctions(ds1).saveToGeode("str_str_region");
JavaDStream<String> ds2 = ...
javaFunctions(ds2).saveToGeode("str_int_region", pairFunc);
```
### Using Geode OQL
There are two geodeOQL Java APIs, with and without GeodeConnectionConf.
Here is an example without GeodeConnectionConf, it will use default
GeodeConnectionConf internally.
```
// assume there's jsc: JavaSparkContext
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc);
DataFrame df = javaFunctions(sqlContext).geodeOQL("select * from /str_str_region");
df.show();
```
Next: [About The Demos] (10_demos.md)