| package io.pivotal.gemfire.spark.connector |
| |
| import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFireRDDWriter} |
| import org.apache.spark.Logging |
| import org.apache.spark.api.java.function.{PairFunction, Function} |
| import org.apache.spark.rdd.RDD |
| |
| /** |
| * Extra gemFire functions on non-Pair RDDs through an implicit conversion. |
| * Import `io.pivotal.gemfire.spark.connector._` at the top of your program to |
| * use these functions. |
| */ |
| class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging { |
| |
| /** |
| * Save the non-pair RDD to GemFire key-value store. |
| * @param regionPath the full path of region that the RDD is stored |
| * @param func the function that converts elements of RDD to key/value pairs |
| * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster |
| */ |
| def saveToGemfire[K, V](regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = defaultConnectionConf): Unit = { |
| connConf.getConnection.validateRegion[K, V](regionPath) |
| logInfo(s"Save RDD id=${rdd.id} to region $regionPath") |
| val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf) |
| rdd.sparkContext.runJob(rdd, writer.write(func) _) |
| } |
| |
| /** This version of saveToGemfire(...) is just for Java API. */ |
| private[connector] def saveToGemfire[K, V]( |
| regionPath: String, func: PairFunction[T, K, V], connConf: GemFireConnectionConf): Unit = { |
| saveToGemfire[K, V](regionPath, func.call _, connConf) |
| } |
| |
| /** |
| * Return an RDD containing all pairs of elements with matching keys in `this` RDD |
| * and the GemFire `Region[K, V]`. The join key from RDD element is generated by |
| * `func(T) => K`, and the key from the GemFire region is just the key of the |
| * key/value pair. |
| * |
| * Each pair of elements of result RDD will be returned as a (t, v) tuple, |
| * where (t) is in `this` RDD and (k, v) is in the GemFire region. |
| * |
| * @param regionPath the region path of the GemFire region |
| * @param func the function that generate region key from RDD element T |
| * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster |
| * @tparam K the key type of the GemFire region |
| * @tparam V the value type of the GemFire region |
| * @return RDD[T, V] |
| */ |
| def joinGemfireRegion[K, V](regionPath: String, func: T => K, |
| connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[T, K, V] = { |
| new GemFireJoinRDD[T, K, V](rdd, func, regionPath, connConf) |
| } |
| |
| /** This version of joinGemfireRegion(...) is just for Java API. */ |
| private[connector] def joinGemfireRegion[K, V]( |
| regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireJoinRDD[T, K, V] = { |
| joinGemfireRegion(regionPath, func.call _, connConf) |
| } |
| |
| /** |
| * Perform a left outer join of `this` RDD and the GemFire `Region[K, V]`. |
| * The join key from RDD element is generated by `func(T) => K`, and the |
| * key from region is just the key of the key/value pair. |
| * |
| * For each element (t) in `this` RDD, the resulting RDD will either contain |
| * all pairs (t, Some(v)) for v in the GemFire region, or the pair |
| * (t, None) if no element in the GemFire region have key `func(t)` |
| * |
| * @param regionPath the region path of the GemFire region |
| * @param func the function that generate region key from RDD element T |
| * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster |
| * @tparam K the key type of the GemFire region |
| * @tparam V the value type of the GemFire region |
| * @return RDD[ T, Option[V] ] |
| */ |
| def outerJoinGemfireRegion[K, V](regionPath: String, func: T => K, |
| connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[T, K, V] = { |
| new GemFireOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf) |
| } |
| |
| /** This version of outerJoinGemfireRegion(...) is just for Java API. */ |
| private[connector] def outerJoinGemfireRegion[K, V]( |
| regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[T, K, V] = { |
| outerJoinGemfireRegion(regionPath, func.call _, connConf) |
| } |
| |
| private[connector] def defaultConnectionConf: GemFireConnectionConf = |
| GemFireConnectionConf(rdd.sparkContext.getConf) |
| |
| } |
| |
| |