blob: 1e1bde3c2bb8307d67169130a56743df8614cb97 [file] [log] [blame]
package ittest.io.pivotal.gemfire.spark.connector
import java.util.Properties
import io.pivotal.gemfire.spark.connector._
import com.gemstone.gemfire.cache.Region
import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager
import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster
import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils
import org.apache.spark.{SparkContext, SparkConf}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import java.util.{HashMap => JHashMap}
class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster {
var sc: SparkContext = null
val numServers = 3
val numObjects = 1000
override def beforeAll() {
// start gemfire cluster, and spark context
val settings = new Properties()
settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
settings.setProperty("num-of-servers", numServers.toString)
val locatorPort = GemFireCluster.start(settings)
// start spark context in local mode
IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
"log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG")
val conf = new SparkConf()
.setAppName("RDDJoinRegionIntegrationTest")
.setMaster("local[2]")
.set(GemFireLocatorPropKey, s"localhost[$locatorPort]")
sc = new SparkContext(conf)
}
override def afterAll() {
// stop connection, spark context, and gemfire cluster
DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf))
sc.stop()
GemFireCluster.stop()
}
// def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
// assert(map1.size == map2.size)
// map1.foreach(e => {
// assert(map2.contains(e._1))
// assert (e._2 == map2.get(e._1).get)
// })
// }
// --------------------------------------------------------------------------------------------
// PairRDD.joinGemfireRegion[K2 <: K, V2](regionPath, connConf): GemFireJoinRDD[(K, V), K, V2]
// --------------------------------------------------------------------------------------------
test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) {
verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region")
}
test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) {
verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region")
}
test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) {
verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region")
}
def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = {
val entriesMap: JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("k_" + i, i))
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
val data = (-5 until 50).map(x => ("k_" + x, x*2))
val rdd = sc.parallelize(data)
val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, connConf)
val rdd2Content = rdd2.collect()
val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap
// matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap)
assert(expectedMap == rdd2Content.toMap)
}
// ------------------------------------------------------------------------------------------------------
// PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GemFireJoinRDD[(K, V), K2, V2]
// -------------------------------------------------------------------------------------------------------
test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) {
verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region")
}
test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) {
verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region")
}
test("PairRDD.joinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) {
verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region")
}
def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = {
val entriesMap: JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("k_" + i, i))
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
val data = (-5 until 50).map(x => (x, x*2))
val rdd = sc.parallelize(data)
val func :((Int, Int)) => String = pair => s"k_${pair._1}"
val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, func /*, connConf*/)
val rdd2Content = rdd2.collect()
val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap
// matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap)
assert(expectedMap == rdd2Content.toMap)
}
// ------------------------------------------------------------------------------------------------
// PairRDD.outerJoinGemfireRegion[K2 <: K, V2](regionPath, connConf): GemFireJoinRDD[(K, V), K, V2]
// ------------------------------------------------------------------------------------------------
test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) {
verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region")
}
test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) {
verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region")
}
test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) {
verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region")
}
def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = {
val entriesMap: JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("k_" + i, i))
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
val data = (-5 until 50).map(x => ("k_" + x, x*2))
val rdd = sc.parallelize(data)
val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath /*, connConf*/)
val rdd2Content = rdd2.collect()
val expectedMap = (-5 until 50).map {
i => if (i < 0) ((s"k_$i", i * 2), None)
else ((s"k_$i", i*2), Some(i))}.toMap
// matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap)
assert(expectedMap == rdd2Content.toMap)
}
// ------------------------------------------------------------------------------------------------------
// PairRDD.joinGemfireRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GemFireJoinRDD[(K, V), K2, V2]
// -------------------------------------------------------------------------------------------------------
test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) {
verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region")
}
test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) {
verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region")
}
test("PairRDD.outerJoinGemFireRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) {
verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region")
}
def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = {
val entriesMap: JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("k_" + i, i))
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
val data = (-5 until 50).map(x => (x, x*2))
val rdd = sc.parallelize(data)
val func :((Int, Int)) => String = pair => s"k_${pair._1}"
val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, func, connConf)
val rdd2Content = rdd2.collect()
val expectedMap = (-5 until 50).map {
i => if (i < 0) ((i, i * 2), None)
else ((i, i*2), Some(i))}.toMap
// matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap)
assert(expectedMap == rdd2Content.toMap)
}
// --------------------------------------------------------------------------------------------
// RDD.joinGemfireRegion[K, V](regionPath, T => K, connConf): GemFireJoinRDD[T, K, V]
// --------------------------------------------------------------------------------------------
test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], replicated region", JoinTest) {
verifyRDDJoinRegion("rr_str_int_region")
}
test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) {
verifyRDDJoinRegion("pr_str_int_region")
}
test("RDD.joinGemFireRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) {
verifyRDDJoinRegion("pr_r_str_int_region")
}
def verifyRDDJoinRegion(regionPath: String): Unit = {
val entriesMap: JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("k_" + i, i))
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
val data = (-5 until 50).map(x => s"k_$x")
val rdd = sc.parallelize(data)
val rdd2 = rdd.joinGemfireRegion[String, Int](regionPath, x => x, connConf)
val rdd2Content = rdd2.collect()
val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap
// matchMaps[String, Int](expectedMap, rdd2Content.toMap)
assert(expectedMap == rdd2Content.toMap)
}
// --------------------------------------------------------------------------------------------
// RDD.outerJoinGemfireRegion[K, V](regionPath, T => K, connConf): GemFireJoinRDD[T, K, V]
// --------------------------------------------------------------------------------------------
test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) {
verifyRDDOuterJoinRegion("rr_str_int_region")
}
test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) {
verifyRDDOuterJoinRegion("pr_str_int_region")
}
test("RDD.outerJoinGemFireRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) {
verifyRDDOuterJoinRegion("pr_r_str_int_region")
}
def verifyRDDOuterJoinRegion(regionPath: String): Unit = {
val entriesMap: JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("k_" + i, i))
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
val data = (-5 until 50).map(x => s"k_$x")
val rdd = sc.parallelize(data)
val rdd2 = rdd.outerJoinGemfireRegion[String, Int](regionPath, x => x /*, connConf */)
val rdd2Content = rdd2.collect()
val expectedMap = (-5 until 50).map {
i => if (i < 0) (s"k_$i", None)
else (s"k_$i", Some(i))}.toMap
// matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap)
assert(expectedMap == rdd2Content.toMap)
}
}