blob: ccdabb770de034fd02971befbbad5cb1caed9ec2 [file] [log] [blame]
package io.pivotal.gemfire.spark.connector.javaapi;
import io.pivotal.gemfire.spark.connector.GemFireConnectionConf;
import io.pivotal.gemfire.spark.connector.GemFireRDDFunctions;
import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD;
import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Option;
import scala.reflect.ClassTag;
import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*;
/**
* A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide GemFire Spark
* Connector functionality.
*
* <p>To obtain an instance of this wrapper, use one of the factory methods in {@link
* io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p>
*/
public class GemFireJavaRDDFunctions<T> {
public final GemFireRDDFunctions<T> rddf;
public GemFireJavaRDDFunctions(JavaRDD<T> rdd) {
this.rddf = new GemFireRDDFunctions<T>(rdd.rdd());
}
/**
* Save the non-pair RDD to GemFire key-value store.
* @param regionPath the full path of region that the RDD is stored
* @param func the PairFunction that converts elements of JavaRDD to key/value pairs
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
*/
public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) {
rddf.saveToGemfire(regionPath, func, connConf);
}
/**
* Save the non-pair RDD to GemFire key-value store with default GemFireConnector.
* @param regionPath the full path of region that the RDD is stored
* @param func the PairFunction that converts elements of JavaRDD to key/value pairs
*/
public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func) {
rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf());
}
/**
* Return an RDD containing all pairs of elements with matching keys in this
* RDD&lt;T> and the GemFire `Region&lt;K, V>`. The join key from RDD
* element is generated by `func(T) => K`, and the key from the GemFire
* region is just the key of the key/value pair.
*
* Each pair of elements of result RDD will be returned as a (t, v2) tuple,
* where t is from this RDD and v is from the GemFire region.
*
* @param regionPath the region path of the GemFire region
* @param func the function that generates region key from RDD element T
* @tparam K the key type of the GemFire region
* @tparam V the value type of the GemFire region
* @return JavaPairRDD&lt;T, V>
*/
public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, Function<T, K> func) {
return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
}
/**
* Return an RDD containing all pairs of elements with matching keys in this
* RDD&lt;T> and the GemFire `Region&lt;K, V>`. The join key from RDD
* element is generated by `func(T) => K`, and the key from the GemFire
* region is just the key of the key/value pair.
*
* Each pair of elements of result RDD will be returned as a (t, v2) tuple,
* where t is from this RDD and v is from the GemFire region.
*
* @param regionPath the region path of the GemFire region
* @param func the function that generates region key from RDD element T
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K the key type of the GemFire region
* @tparam V the value type of the GemFire region
* @return JavaPairRDD&lt;T, V>
*/
public <K, V> JavaPairRDD<T, V> joinGemfireRegion(
String regionPath, Function<T, K> func, GemFireConnectionConf connConf) {
GemFireJoinRDD<T, K, V> rdd = rddf.joinGemfireRegion(regionPath, func, connConf);
ClassTag<T> kt = fakeClassTag();
ClassTag<V> vt = fakeClassTag();
return new JavaPairRDD<>(rdd, kt, vt);
}
/**
* Perform a left outer join of this RDD&lt;T> and the GemFire `Region&lt;K, V>`.
* The join key from RDD element is generated by `func(T) => K`, and the
* key from region is just the key of the key/value pair.
*
* For each element (t) in this RDD, the resulting RDD will either contain
* all pairs (t, Some(v)) for v in the GemFire region, or the pair
* (t, None) if no element in the GemFire region have key `func(t)`.
*
* @param regionPath the region path of the GemFire region
* @param func the function that generates region key from RDD element T
* @tparam K the key type of the GemFire region
* @tparam V the value type of the GemFire region
* @return JavaPairRDD&lt;T, Option&lt;V>>
*/
public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String regionPath, Function<T, K> func) {
return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf());
}
/**
* Perform a left outer join of this RDD&lt;T> and the GemFire `Region&lt;K, V>`.
* The join key from RDD element is generated by `func(T) => K`, and the
* key from region is just the key of the key/value pair.
*
* For each element (t) in this RDD, the resulting RDD will either contain
* all pairs (t, Some(v)) for v in the GemFire region, or the pair
* (t, None) if no element in the GemFire region have key `func(t)`.
*
* @param regionPath the region path of the GemFire region
* @param func the function that generates region key from RDD element T
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K the key type of the GemFire region
* @tparam V the value type of the GemFire region
* @return JavaPairRDD&lt;T, Option&lt;V>>
*/
public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(
String regionPath, Function<T, K> func, GemFireConnectionConf connConf) {
GemFireOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf);
ClassTag<T> kt = fakeClassTag();
ClassTag<Option<V>> vt = fakeClassTag();
return new JavaPairRDD<>(rdd, kt, vt);
}
}