blob: 9fb1c0410099e819ae8303ab76a9c3957fc2b436 [file] [log] [blame]
package io.pivotal.gemfire.spark.connector
import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireOuterJoinRDD, GemFireJoinRDD, GemFirePairRDDWriter}
import org.apache.spark.Logging
import org.apache.spark.api.java.function.Function
import org.apache.spark.rdd.RDD
/**
* Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion.
* Import `io.pivotal.gemfire.spark.connector._` at the top of your program to
* use these functions.
*/
class GemFirePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging {
/**
* Save the RDD of pairs to GemFire key-value store without any conversion
* @param regionPath the full path of region that the RDD is stored
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
*/
def saveToGemfire(regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): Unit = {
connConf.getConnection.validateRegion[K, V](regionPath)
logInfo(s"Save RDD id=${rdd.id} to region $regionPath")
val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf)
rdd.sparkContext.runJob(rdd, writer.write _)
}
/**
* Return an RDD containing all pairs of elements with matching keys in `this`
* RDD and the GemFire `Region[K, V2]`. Each pair of elements will be returned
* as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the
* GemFire region.
*
*@param regionPath the region path of the GemFire region
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K2 the key type of the GemFire region
* @tparam V2 the value type of the GemFire region
* @return RDD[T, V]
*/
def joinGemfireRegion[K2 <: K, V2](
regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K, V2] = {
new GemFireJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
}
/**
* Return an RDD containing all pairs of elements with matching keys in `this` RDD
* and the GemFire `Region[K2, V2]`. The join key from RDD element is generated by
* `func(K, V) => K2`, and the key from the GemFire region is jus the key of the
* key/value pair.
*
* Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple,
* where (k, v) is in `this` RDD and (k2, v2) is in the GemFire region.
*
* @param regionPath the region path of the GemFire region
* @param func the function that generates region key from RDD element (K, V)
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K2 the key type of the GemFire region
* @tparam V2 the value type of the GemFire region
* @return RDD[(K, V), V2]
*/
def joinGemfireRegion[K2, V2](
regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireJoinRDD[(K, V), K2, V2] =
new GemFireJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
/** This version of joinGemfireRegion(...) is just for Java API. */
private[connector] def joinGemfireRegion[K2, V2](
regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireJoinRDD[(K, V), K2, V2] = {
new GemFireJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
}
/**
* Perform a left outer join of `this` RDD and the GemFire `Region[K, V2]`.
* For each element (k, v) in `this` RDD, the resulting RDD will either contain
* all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
* ((k, v), None)) if no element in the GemFire region have key k.
*
* @param regionPath the region path of the GemFire region
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K2 the key type of the GemFire region
* @tparam V2 the value type of the GemFire region
* @return RDD[ (K, V), Option[V] ]
*/
def outerJoinGemfireRegion[K2 <: K, V2](
regionPath: String, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K, V2] = {
new GemFireOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
}
/**
* Perform a left outer join of `this` RDD and the GemFire `Region[K2, V2]`.
* The join key from RDD element is generated by `func(K, V) => K2`, and the
* key from region is jus the key of the key/value pair.
*
* For each element (k, v) in `this` RDD, the resulting RDD will either contain
* all pairs ((k, v), Some(v2)) for v2 in the GemFire region, or the pair
* ((k, v), None)) if no element in the GemFire region have key `func(k, v)`.
*
*@param regionPath the region path of the GemFire region
* @param func the function that generates region key from RDD element (K, V)
* @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster
* @tparam K2 the key type of the GemFire region
* @tparam V2 the value type of the GemFire region
* @return RDD[ (K, V), Option[V] ]
*/
def outerJoinGemfireRegion[K2, V2](
regionPath: String, func: ((K, V)) => K2, connConf: GemFireConnectionConf = defaultConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = {
new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
}
/** This version of outerJoinGemfireRegion(...) is just for Java API. */
private[connector] def outerJoinGemfireRegion[K2, V2](
regionPath: String, func: Function[(K, V), K2], connConf: GemFireConnectionConf): GemFireOuterJoinRDD[(K, V), K2, V2] = {
new GemFireOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
}
private[connector] def defaultConnectionConf: GemFireConnectionConf =
GemFireConnectionConf(rdd.sparkContext.getConf)
}