import java.util.Properties
import com.gemstone.gemfire.cache.query.QueryService
import com.gemstone.gemfire.cache.query.internal.StructImpl
import io.pivotal.gemfire.spark.connector._
import com.gemstone.gemfire.cache.Region
import io.pivotal.gemfire.spark.connector.internal.{RegionMetadata, DefaultGemFireConnectionManager}
import io.pivotal.gemfire.spark.connector.internal.oql.{RDDConverter, QueryRDD}
import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream}
import org.apache.spark.{SparkContext, SparkConf}
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import scala.collection.JavaConversions
import scala.reflect.ClassTag
case class Number(str: String, len: Int)
class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GemFireCluster {
var sc: SparkContext = null
override def beforeAll() {
// start gemfire cluster, and spark context
val settings = new Properties()
settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml")
settings.setProperty("num-of-servers", "2")
val locatorPort = GemFireCluster.start(settings)
// start spark context in local mode
IOUtils.configTestLog4j("ERROR", "" -> "INFO",
"" -> "DEBUG")
val conf = new SparkConf()
.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
.set(GemFireLocatorPropKey, s"localhost[$locatorPort]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "io.pivotal.gemfire.spark.connector.GemFireKryoRegistrator")
sc = new SparkContext(conf)
override def afterAll() {
// stop connection, spark context, and gemfire cluster
//Convert Map[Object, Object] to java.util.Properties
private def map2Props(map: Map[Object, Object]): java.util.Properties =
(new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props}
// ===========================================================
// DefaultGemFireConnection functional tests
// ===========================================================
test("DefaultGemFireConnection.validateRegion()") {
val conn = GemFireConnectionConf(sc.getConf).getConnection
// normal exist-region
var regionPath: String = "str_str_region"
conn.validateRegion[String, String](regionPath)
// non-exist region
regionPath = "non_exist_region"
try {
conn.validateRegion[String, String](regionPath)
fail("validateRegion failed to catch non-exist region error")
} catch {
case e: RuntimeException =>
if (! e.getMessage.contains(s"The region named $regionPath was not found"))
fail("validateRegion gives wrong exception on non-exist region", e)
case e: Throwable =>
fail("validateRegion gives wrong exception on non-exist region", e)
// Note: currently, can't catch type mismatch error
conn.validateRegion[String, Integer]("str_str_region")
test("DefaultGemFireConnection.getRegionMetadata()") {
val conn = GemFireConnectionConf(sc.getConf).getConnection
// exist region
validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false)
validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false)
validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true)
// non-exist region
assert(! conn.getRegionMetadata("no_exist_region").isDefined)
def validateRegionMetadata(
conn: GemFireConnection, regionPath: String, partitioned: Boolean, buckets: Int,
keyType: String, valueType: String, emptyMap: Boolean): Unit = {
val mdOption = conn.getRegionMetadata(regionPath)
val md = mdOption.get
assert(md.getRegionPath == s"/$regionPath")
assert(md.isPartitioned == partitioned)
assert(md.getKeyTypeName == keyType)
assert(md.getValueTypeName == valueType)
assert(md.getTotalBuckets == buckets)
if (emptyMap) assert(md.getServerBucketMap == null)
else assert(md.getServerBucketMap != null)
test("DefaultGemFireConnection.getRegionProxy()") {
val conn = GemFireConnectionConf(sc.getConf).getConnection
val region1 = conn.getRegionProxy[String, String]("str_str_region")
region1.put("1", "One")
assert(region1.get("1") == "One")
assert(region1.get("1") == null)
// getRegionProxy doesn't fail when region doesn't exist
val region2 = conn.getRegionProxy[String, String]("non_exist_region")
try {
region2.put("1", "One")
fail("getRegionProxy failed to catch non-exist region error")
} catch {
case e: Exception =>
if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) {
fail("validateRegion gives wrong exception on non-exist region", e)
// Note: DefaultGemFireConnecton.getQuery() and getRegionData() are covered by
// RetrieveRegionIntegrationTest.scala and following OQL tests.
// ===========================================================
// OQL functional tests
// ===========================================================
private def initRegion(regionName: String): Unit = {
//Populate some data in the region
val conn = GemFireConnectionConf(sc.getConf).getConnection
val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
//This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String]
var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42"))
var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29"))
val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active",
"position1" -> position1, "position2" -> position2)))
rgn.put("1", portfolio1)
position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925"))
position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972"))
val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive",
"position1" -> position1, "position2" -> position2)))
rgn.put("2", portfolio2)
position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active",
"position1" -> position1, "position2" -> position2)))
rgn.put("3", portfolio3)
position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572"))
position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34"))
val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive",
"position1" -> position1, "position2" -> position2)))
rgn.put("4", portfolio4)
position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572"))
position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34"))
val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active",
"position1" -> position1, "position2" -> position2)))
rgn.put("5", portfolio5)
position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572"))
position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34"))
val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive",
"position1" -> position1, "position2" -> position2)))
rgn.put("6", portfolio6)
position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32"))
position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373"))
val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active",
"position1" -> position1, "position2" -> position2)))
//Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug
rgn.put("7", portfolio7)
private def getQueryRDD[T: ClassTag](
query: String, connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)): QueryRDD[T] =
new QueryRDD[T](sc, query, connConf)
test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") {
test("Run GemFire OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") {
private def simpleQuery(regionName: String) {
//Populate some data in the region
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val rgn: Region[String, String] = conn.getRegionProxy(regionName)
rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
//Create QueryRDD using OQL
val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName")
//verify the QueryRDD
val oqlRS: Array[String] = OQLResult.collect()
oqlRS should have length 3
oqlRS should contain theSameElementsAs List("one", "two", "three")
//Convert QueryRDD to DataFrame
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val dataFrame = => Number(x, x.length)).toDF()
//Register dataFrame as a table of two columns of type String and Int respectively
//Issue SQL query against the table
val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
//Verify the SQL query result, r(0) mean column 0
val sqlRS: Array[Any] = => r(0)).collect()
sqlRS should have length 3
sqlRS should contain theSameElementsAs List("one", "two", "three")
//Convert QueryRDD to DataFrame using RDDConverter
val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
//Register dataFrame2 as a table of two columns of type String and Int respectively
//Issue SQL query against the table
val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2")
//Verify the SQL query result, r(0) mean column 0
val sqlRS2: Array[Any] = => r(0)).collect()
sqlRS2 should have length 3
sqlRS2 should contain theSameElementsAs List("one", "two", "three")
//Remove the region entries, because other tests might use the same region as well
List("1", "2", "3").foreach(rgn.remove)
test("Run GemFire OQL query and directly return DataFrame: Partitioned Region") {
test("Run GemFire OQL query and directly return DataFrame: Replicated Region") {
private def simpleQueryDataFrame(regionName: String) {
//Populate some data in the region
val conn = GemFireConnectionConf(sc.getConf).getConnection
val rgn: Region[String, String] = conn.getRegionProxy(regionName)
rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three")))
//Create DataFrame using GemFire OQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dataFrame = sqlContext.gemfireOQL(s"select * from /$regionName")
//Issue SQL query against the table
val SQLResult = sqlContext.sql("SELECT * FROM numberTable")
//Verify the SQL query result, r(0) mean column 0
val sqlRS: Array[Any] = => r(0)).collect()
sqlRS should have length 3
sqlRS should contain theSameElementsAs List("one", "two", "three")
//Remove the region entries, because other tests might use the same region as well
List("1", "2", "3").foreach(rgn.remove)
test("GemFire OQL query with UDT: Partitioned Region") {
test("GemFire OQL query with UDT: Replicated Region") {
private def queryUDT(regionName: String) {
//Populate some data in the region
val conn = GemFireConnectionConf(sc.getConf).getConnection
val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
val e1: Employee = new Employee("hello", 123)
val e2: Employee = new Employee("world", 456)
rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
//Create QueryRDD using OQL
val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName")
//verify the QueryRDD
val oqlRS: Array[Object] = OQLResult.collect()
oqlRS should have length 2 => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456)
//Convert QueryRDD to DataFrame
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//Convert QueryRDD to DataFrame using RDDConverter
val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
val SQLResult = sqlContext.sql("SELECT * FROM employee")
//Verify the SQL query result
val sqlRS = => r(0)).collect()
sqlRS should have length 2
sqlRS should contain theSameElementsAs List("hello", "world")
List("1", "2").foreach(rgn.remove)
test("GemFire OQL query with UDT and directly return DataFrame: Partitioned Region") {
test("GemFire OQL query with UDT and directly return DataFrame: Replicated Region") {
private def queryUDTDataFrame(regionName: String) {
//Populate some data in the region
val conn = GemFireConnectionConf(sc.getConf).getConnection
val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
val e1: Employee = new Employee("hello", 123)
val e2: Employee = new Employee("world", 456)
rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2)))
//Create DataFrame using GemFire OQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dataFrame = sqlContext.gemfireOQL(s"select name, age from /$regionName")
val SQLResult = sqlContext.sql("SELECT * FROM employee")
//Verify the SQL query result
val sqlRS = => r(0)).collect()
sqlRS should have length 2
sqlRS should contain theSameElementsAs List("hello", "world")
List("1", "2").foreach(rgn.remove)
test("GemFire OQL query with more complex UDT: Partitioned Region") {
test("GemFire OQL query with more complex UDT: Replicated Region") {
private def complexUDT(regionName: String) {
//Create QueryRDD using OQL
val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
//verify the QueryRDD
val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId)
oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
//Convert QueryRDD to DataFrame
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//Convert QueryRDD to DataFrame using RDDConverter
val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
//Verify the SQL query result
val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
test("GemFire OQL query with more complex UDT and directly return DataFrame: Partitioned Region") {
test("GemFire OQL query with more complex UDT and directly return DataFrame: Replicated Region") {
private def complexUDTDataFrame(regionName: String) {
//Create DataFrame using GemFire OQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dataFrame = sqlContext.gemfireOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'")
val SQLResult = sqlContext.sql("SELECT * FROM Portfolio")
//Verify the SQL query result
val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType)
sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4")
test("GemFire OQL query with more complex UDT with Projection: Partitioned Region") {
test("GemFire OQL query with more complex UDT with Projection: Replicated Region") {
private def queryComplexUDTProjection(regionName: String) {
//Create QueryRDD using OQL
val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
//verify the QueryRDD
val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int])
oqlRS should contain theSameElementsAs List(1, 3, 5, 7)
//Convert QueryRDD to DataFrame
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//Convert QueryRDD to DataFrame using RDDConverter
val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext)
val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
//Verify the SQL query result
val sqlRS = SQLResult.collect().map(r => r(0))
sqlRS should contain theSameElementsAs List(3)
test("GemFire OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") {
test("GemFire OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") {
private def queryComplexUDTProjectionDataFrame(regionName: String) {
//Create DataFrame using GemFire OQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dataFrame = sqlContext.gemfireOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""")
val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
//Verify the SQL query result
val sqlRS = SQLResult.collect().map(r => r(0))
sqlRS should contain theSameElementsAs List(3)
test("GemFire OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") {
test("GemFire OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") {
private def queryComplexUDTNestProjectionDataFrame(regionName: String) {
//Create DataFrame using GemFire OQL
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dataFrame = sqlContext.gemfireOQL(s"""SELECT, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""")
val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'")
//Verify the SQL query result
val sqlRS = SQLResult.collect().map(r => r(0))
sqlRS should contain theSameElementsAs List(3)
test("Undefined instance deserialization: Partitioned Region") {
test("Undefined instance deserialization: Replicated Region") {
private def undefinedInstanceDeserialization(regionName: String) {
val conn = GemFireConnectionConf(sc.getConf).getConnection
val rgn: Region[Object, Object] = conn.getRegionProxy(regionName)
//Put some new data
rgn.put("1", "one")
//Query some non-existent columns, which should return UNDEFINED
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dataFrame = sqlContext.gemfireOQL(s"SELECT col100, col200 FROM /$regionName")
val col1 = dataFrame.first().apply(0)
val col2 = dataFrame.first().apply(1)
assert(col1 == QueryService.UNDEFINED)
assert(col2 == QueryService.UNDEFINED)
//Verify that col1 and col2 refer to the same Undefined object
assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef])
test("RDD.saveToGemFire") {
val regionName = "str_str_region"
// generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66))
val data = (1 to 6).map(_.toString).map(e=> (e, e*2))
val rdd = sc.parallelize(data)
// verify
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName)
println("region key set on server: " + region.keySetOnServer())
assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
(1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e)))
// ===========================================================
// DStream.saveToGemfire() functional tests
// ===========================================================
test("Basic DStream test") {
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
import io.pivotal.gemfire.spark.connector.streaming._
import org.apache.spark.streaming.ManualClockHelper
class TestStreamListener extends StreamingListener {
var count = 0
override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1
def batchDuration = Seconds(1)
val ssc = new StreamingContext(sc, batchDuration)
val input = Seq(1 to 4, 5 to 8, 9 to 12)
val dstream = new TestInputDStream(ssc, input, 2)
dstream.saveToGemfire[String, Int]("str_int_region", (e: Int) => (e.toString, e))
try {
val listener = new TestStreamListener
ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length)
while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50)
} catch {
case e: Exception => e.printStackTrace(); throw e
// } finally {
// ssc.stop()
val connConf: GemFireConnectionConf = GemFireConnectionConf(sc.getConf)
val conn = connConf.getConnection
val region: Region[String, Int] = conn.getRegionProxy("str_int_region")
// verify gemfire region contents
println("region key set on server: " + region.keySetOnServer())
assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer()))
(1 to 12).foreach(e => assert(e == region.get(e.toString)))