| package io.pivotal.gemfire.spark.connector.javaapi; |
| |
| import io.pivotal.gemfire.spark.connector.GemFireConnectionConf; |
| import io.pivotal.gemfire.spark.connector.GemFireRDDFunctions; |
| import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireJoinRDD; |
| import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireOuterJoinRDD; |
| import org.apache.spark.api.java.JavaPairRDD; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.api.java.function.Function; |
| import org.apache.spark.api.java.function.PairFunction; |
| import scala.Option; |
| import scala.reflect.ClassTag; |
| |
| import static io.pivotal.gemfire.spark.connector.javaapi.JavaAPIHelper.*; |
| |
| /** |
| * A Java API wrapper over {@link org.apache.spark.api.java.JavaRDD} to provide GemFire Spark |
| * Connector functionality. |
| * |
| * <p>To obtain an instance of this wrapper, use one of the factory methods in {@link |
| * io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil} class.</p> |
| */ |
| public class GemFireJavaRDDFunctions<T> { |
| |
| public final GemFireRDDFunctions<T> rddf; |
| |
| public GemFireJavaRDDFunctions(JavaRDD<T> rdd) { |
| this.rddf = new GemFireRDDFunctions<T>(rdd.rdd()); |
| } |
| |
| /** |
| * Save the non-pair RDD to GemFire key-value store. |
| * @param regionPath the full path of region that the RDD is stored |
| * @param func the PairFunction that converts elements of JavaRDD to key/value pairs |
| * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster |
| */ |
| public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func, GemFireConnectionConf connConf) { |
| rddf.saveToGemfire(regionPath, func, connConf); |
| } |
| |
| /** |
| * Save the non-pair RDD to GemFire key-value store with default GemFireConnector. |
| * @param regionPath the full path of region that the RDD is stored |
| * @param func the PairFunction that converts elements of JavaRDD to key/value pairs |
| */ |
| public <K, V> void saveToGemfire(String regionPath, PairFunction<T, K, V> func) { |
| rddf.saveToGemfire(regionPath, func, rddf.defaultConnectionConf()); |
| } |
| |
| /** |
| * Return an RDD containing all pairs of elements with matching keys in this |
| * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD |
| * element is generated by `func(T) => K`, and the key from the GemFire |
| * region is just the key of the key/value pair. |
| * |
| * Each pair of elements of result RDD will be returned as a (t, v2) tuple, |
| * where t is from this RDD and v is from the GemFire region. |
| * |
| * @param regionPath the region path of the GemFire region |
| * @param func the function that generates region key from RDD element T |
| * @tparam K the key type of the GemFire region |
| * @tparam V the value type of the GemFire region |
| * @return JavaPairRDD<T, V> |
| */ |
| public <K, V> JavaPairRDD<T, V> joinGemfireRegion(String regionPath, Function<T, K> func) { |
| return joinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); |
| } |
| |
| /** |
| * Return an RDD containing all pairs of elements with matching keys in this |
| * RDD<T> and the GemFire `Region<K, V>`. The join key from RDD |
| * element is generated by `func(T) => K`, and the key from the GemFire |
| * region is just the key of the key/value pair. |
| * |
| * Each pair of elements of result RDD will be returned as a (t, v2) tuple, |
| * where t is from this RDD and v is from the GemFire region. |
| * |
| * @param regionPath the region path of the GemFire region |
| * @param func the function that generates region key from RDD element T |
| * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster |
| * @tparam K the key type of the GemFire region |
| * @tparam V the value type of the GemFire region |
| * @return JavaPairRDD<T, V> |
| */ |
| public <K, V> JavaPairRDD<T, V> joinGemfireRegion( |
| String regionPath, Function<T, K> func, GemFireConnectionConf connConf) { |
| GemFireJoinRDD<T, K, V> rdd = rddf.joinGemfireRegion(regionPath, func, connConf); |
| ClassTag<T> kt = fakeClassTag(); |
| ClassTag<V> vt = fakeClassTag(); |
| return new JavaPairRDD<>(rdd, kt, vt); |
| } |
| |
| /** |
| * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`. |
| * The join key from RDD element is generated by `func(T) => K`, and the |
| * key from region is just the key of the key/value pair. |
| * |
| * For each element (t) in this RDD, the resulting RDD will either contain |
| * all pairs (t, Some(v)) for v in the GemFire region, or the pair |
| * (t, None) if no element in the GemFire region have key `func(t)`. |
| * |
| * @param regionPath the region path of the GemFire region |
| * @param func the function that generates region key from RDD element T |
| * @tparam K the key type of the GemFire region |
| * @tparam V the value type of the GemFire region |
| * @return JavaPairRDD<T, Option<V>> |
| */ |
| public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion(String regionPath, Function<T, K> func) { |
| return outerJoinGemfireRegion(regionPath, func, rddf.defaultConnectionConf()); |
| } |
| |
| /** |
| * Perform a left outer join of this RDD<T> and the GemFire `Region<K, V>`. |
| * The join key from RDD element is generated by `func(T) => K`, and the |
| * key from region is just the key of the key/value pair. |
| * |
| * For each element (t) in this RDD, the resulting RDD will either contain |
| * all pairs (t, Some(v)) for v in the GemFire region, or the pair |
| * (t, None) if no element in the GemFire region have key `func(t)`. |
| * |
| * @param regionPath the region path of the GemFire region |
| * @param func the function that generates region key from RDD element T |
| * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster |
| * @tparam K the key type of the GemFire region |
| * @tparam V the value type of the GemFire region |
| * @return JavaPairRDD<T, Option<V>> |
| */ |
| public <K, V> JavaPairRDD<T, Option<V>> outerJoinGemfireRegion( |
| String regionPath, Function<T, K> func, GemFireConnectionConf connConf) { |
| GemFireOuterJoinRDD<T, K, V> rdd = rddf.outerJoinGemfireRegion(regionPath, func, connConf); |
| ClassTag<T> kt = fakeClassTag(); |
| ClassTag<Option<V>> vt = fakeClassTag(); |
| return new JavaPairRDD<>(rdd, kt, vt); |
| } |
| |
| } |