blob: e91221c2ef984a5259f5513615bd63f400d9ac7e [file] [log] [blame]
package ittest.io.pivotal.gemfire.spark.connector
import java.util.Properties
import io.pivotal.gemfire.spark.connector._
import com.gemstone.gemfire.cache.Region
import io.pivotal.gemfire.spark.connector.internal.DefaultGemFireConnectionManager
import ittest.io.pivotal.gemfire.spark.connector.testkit.GemFireCluster
import ittest.io.pivotal.gemfire.spark.connector.testkit.IOUtils
import org.apache.spark.{SparkContext, SparkConf}
import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers}
import java.util.{HashMap => JHashMap}
class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster {
var sc: SparkContext = null
val numServers = 4
val numObjects = 1000
override def beforeAll() {
// start gemfire cluster, and spark context
val settings = new Properties()
settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml")
settings.setProperty("num-of-servers", numServers.toString)
val locatorPort = GemFireCluster.start(settings)
// start spark context in local mode
IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO",
"log4j.logger.io.pivotal.gemfire.spark.connector" -> "DEBUG")
val conf = new SparkConf()
.setAppName("RetrieveRegionIntegrationTest")
.setMaster("local[2]")
.set(GemFireLocatorPropKey, s"localhost[$locatorPort]")
sc = new SparkContext(conf)
}
override def afterAll() {
// stop connection, spark context, and gemfire cluster
DefaultGemFireConnectionManager.closeConnection(GemFireConnectionConf(sc.getConf))
sc.stop()
GemFireCluster.stop()
}
def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = {
//Populate some data in the region
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[K, V] = conn.getRegionProxy(regionName)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
verifyRetrieveRegion[K,V](regionName, entriesMap)
}
def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V]) = {
val rdd = sc.gemfireRegion(regionName)
val collectedObjs = rdd.collect()
collectedObjs should have length entriesMap.size
import scala.collection.JavaConverters._
matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap)
}
def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = {
assert(map1.size == map2.size)
map1.foreach(e => {
assert(map2.contains(e._1))
assert (e._2 == map2.get(e._1).get)
}
)
}
//Retrieve region for Partitioned Region where some nodes are empty (empty iterator)
//This test has to run first...the rest of the tests always use the same num objects
test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") {
val numObjects = numServers - 1
val entriesMap:JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("key_" + i, i))
executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap)
}
//Test for retrieving from region containing string key and int value
def verifyRetrieveStringStringRegion(regionName:String) = {
val entriesMap:JHashMap[String, String] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i))
executeTest[String, String](regionName, numObjects, entriesMap)
}
test("Retrieve Region with replicate redundant string string") {
verifyRetrieveStringStringRegion("rr_obj_obj_region")
}
test("Retrieve Region with partitioned string string") {
verifyRetrieveStringStringRegion("pr_obj_obj_region")
}
test("Retrieve Region with partitioned redundant string string") {
verifyRetrieveStringStringRegion("pr_r_obj_obj_region")
}
//Test for retrieving from region containing string key and string value
def verifyRetrieveStringIntRegion(regionName:String) = {
val entriesMap:JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("key_" + i, i))
executeTest[String, Int](regionName, numObjects, entriesMap)
}
test("Retrieve Region with replicate string int region") {
verifyRetrieveStringIntRegion("rr_str_int_region")
}
test("Retrieve Region with partitioned string int region") {
verifyRetrieveStringIntRegion("pr_str_int_region")
}
test("Retrieve Region with partitioned redundant string int region") {
verifyRetrieveStringIntRegion("pr_r_str_int_region")
}
//Tests for retrieving from region containing string key and object value
def verifyRetrieveStringObjectRegion(regionName:String) = {
val entriesMap:JHashMap[String, Object] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i)))
executeTest[String, Object](regionName, numObjects, entriesMap)
}
test("Retrieve Region with replicate string obj") {
verifyRetrieveStringObjectRegion("rr_obj_obj_region")
}
test("Retrieve Region with partitioned string obj") {
verifyRetrieveStringObjectRegion("pr_obj_obj_region")
}
test("Retrieve Region with partitioned redundant string obj") {
verifyRetrieveStringObjectRegion("pr_r_obj_obj_region")
}
//Test for retrieving from region containing string key and map value
def verifyRetrieveStringMapRegion(regionName:String) = {
val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap()
(0 until numObjects).map(i => {
val hashMap:JHashMap[String, String] = new JHashMap()
hashMap.put("mapKey:" + i, "mapValue:" + i)
entriesMap.put("key_" + i, hashMap)
})
executeTest(regionName, numObjects, entriesMap)
}
test("Retrieve Region with replicate string map region") {
verifyRetrieveStringMapRegion("rr_obj_obj_region")
}
test("Retrieve Region with partitioned string map region") {
verifyRetrieveStringMapRegion("pr_obj_obj_region")
}
test("Retrieve Region with partitioned redundant string map region") {
verifyRetrieveStringMapRegion("pr_r_obj_obj_region")
}
//Test and helpers specific for retrieving from region containing string key and byte[] value
def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = {
//Populate some data in the region
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName)
rgn.putAll(entriesMap)
verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap)
}
def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]]) = {
val rdd = sc.gemfireRegion(regionName)
val collectedObjs = rdd.collect()
collectedObjs should have length entriesMap.size
import scala.collection.JavaConverters._
matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap)
}
def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = {
map1.foreach(e => {
assert(map2.contains(e._1))
assert (java.util.Arrays.equals(e._2, map2.get(e._1).get))
}
)
assert(map1.size == map2.size)
}
def verifyRetrieveStringByteArrayRegion(regionName:String) = {
val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte)))
executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap)
}
test("Retrieve Region with replicate region string byte[] region") {
verifyRetrieveStringByteArrayRegion("rr_obj_obj_region")
}
test("Retrieve Region with partition region string byte[] region") {
verifyRetrieveStringByteArrayRegion("pr_obj_obj_region")
}
test("Retrieve Region with partition redundant region string byte[] region") {
verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region")
}
test("Retrieve Region with where clause on partitioned redundant region", FilterTest) {
verifyRetrieveRegionWithWhereClause("pr_r_str_int_region")
}
test("Retrieve Region with where clause on partitioned region", FilterTest) {
verifyRetrieveRegionWithWhereClause("pr_str_int_region")
}
test("Retrieve Region with where clause on replicated region", FilterTest) {
verifyRetrieveRegionWithWhereClause("rr_str_int_region")
}
def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = {
val entriesMap: JHashMap[String, Int] = new JHashMap()
(0 until numObjects).map(i => entriesMap.put("key_" + i, i))
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, Int] = conn.getRegionProxy(regionPath)
rgn.removeAll(rgn.keySetOnServer())
rgn.putAll(entriesMap)
val rdd = sc.gemfireRegion(regionPath).where("value.intValue() < 50")
val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap
val collectedObjs = rdd.collect()
// collectedObjs should have length expectedMap.size
matchMaps[String, Int](expectedMap, collectedObjs.toMap)
}
}