blob: 4ffacc5a054967c664ca56349eb01543f390ad86 [file] [log] [blame]
package io.pivotal.gemfire.spark.connector
import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFireRDDWriter}
import org.apache.spark.Logging
import org.apache.spark.api.java.function.{PairFunction, Function}
import org.apache.spark.rdd.RDD
/**
* Extra gemFire functions on non-Pair RDDs through an implicit conversion.
* Import `io.pivotal.gemfire.spark.connector._` at the top of your program to
* use these functions.
*/
class GemFireRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
/**
* Save the non-pair RDD to GemFire key-value store.
* @param regionPath the full path of region that the RDD is stored
* @param func the function that converts elements of RDD to key/value pairs
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
*/
def saveToGemfire[K, V](regionPath: String, func: T => (K, V), connConf: GemFireConnectionConf = defaultConnectionConf): Unit = {
connConf.getConnection.validateRegion[K, V](regionPath)
logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf)
rdd.sparkContext.runJob(rdd, writer.write(func) _)
}
/** This version of saveToGemfire(...) is just for Java API. */
private[connector] def saveToGemfire[K, V](
regionPath: String, func: PairFunction[T, K, V], connConf: GemFireConnectionConf): Unit = {
saveToGemfire[K, V](regionPath, func.call _, connConf)
}
/**
* Return an RDD containing all pairs of elements with matching keys in `this` RDD
* and the GemFire `Region[K, V]`. The join key from RDD element is generated by
* `func(T) => K`, and the key from the GemFire region is just the key of the
* key/value pair.
*
* Each pair of elements of result RDD will be returned as a (t, v) tuple,
* where (t) is in `this` RDD and (k, v) is in the GemFire region.
*
* @param regionPath the region path of the GemFire region
* @param func the function that generate region key from RDD element T
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K the key type of the GemFire region
* @tparam V the value type of the GemFire region
* @return RDD[T, V]
*/
def joinGemfireRegion[K, V](regionPath: String, func: T => K,
connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[T, K, V] = {
new GemFireJoinRDD[T, K, V](rdd, func, regionPath, connConf)
}
/** This version of joinGemfireRegion(...) is just for Java API. */
private[connector] def joinGemfireRegion[K, V](
regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireJoinRDD[T, K, V] = {
joinGemfireRegion(regionPath, func.call _, connConf)
}
/**
* Perform a left outer join of `this` RDD and the GemFire `Region[K, V]`.
* The join key from RDD element is generated by `func(T) => K`, and the
* key from region is just the key of the key/value pair.
*
* For each element (t) in `this` RDD, the resulting RDD will either contain
* all pairs (t, Some(v)) for v in the GemFire region, or the pair
* (t, None) if no element in the GemFire region have key `func(t)`
*
* @param regionPath the region path of the GemFire region
* @param func the function that generate region key from RDD element T
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K the key type of the GemFire region
* @tparam V the value type of the GemFire region
* @return RDD[ T, Option[V] ]
*/
def outerJoinGemfireRegion[K, V](regionPath: String, func: T => K,
connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[T, K, V] = {
new GemFireOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
}
/** This version of outerJoinGemfireRegion(...) is just for Java API. */
private[connector] def outerJoinGemfireRegion[K, V](
regionPath: String, func: Function[T, K], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[T, K, V] = {
outerJoinGemfireRegion(regionPath, func.call _, connConf)
}
private[connector] def defaultConnectionConf: GemFireConnectionConf =
GemFireConnectionConf(rdd.sparkContext.getConf)
}