blob: 04294e7c13cac25c99dfa7c2358c33ab791a51af [file] [log] [blame]
package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions
import com.gemstone.gemfire.DataSerializer
import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender}
import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl}
import com.gemstone.gemfire.cache.query.types.ObjectType
import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, HeapDataOutputStream}
import com.gemstone.gemfire.internal.cache.{CachedDeserializable, CachedDeserializableFactory}
import org.scalatest.{BeforeAndAfter, FunSuite}
import scala.collection.JavaConversions._
import scala.concurrent.{Await, ExecutionContext, Future}
import ExecutionContext.Implicits.global
import scala.concurrent.duration._
class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter {
/**
* A test ResultSender that connects struct ResultSender and ResultCollector
* Note: this test ResultSender has to copy the data (byte array) since the
* StructStreamingResultSender will reuse the byte array.
*/
class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] {
var finishedNum = 0
override def sendResult(result: Object): Unit =
collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
/** exception should be sent via lastResult() */
override def sendException(throwable: Throwable): Unit =
throw new UnsupportedOperationException("sendException is not supported.")
override def lastResult(result: Object): Unit = {
collector.addResult(null, result.asInstanceOf[Array[Byte]].clone())
this.synchronized {
finishedNum += 1
if (finishedNum == num)
collector.endResults()
}
}
}
/** common variables */
var collector: StructStreamingResultCollector = _
var baseSender: LocalResultSender = _
/** common types */
val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType]
val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType))
val OneColType = new StructTypeImpl(Array("value"), Array(objType))
before {
collector = new StructStreamingResultCollector
baseSender = new LocalResultSender(collector, 1)
}
test("transfer simple data") {
verifySimpleTransfer(sendDataType = true)
}
test("transfer simple data with no type info") {
verifySimpleTransfer(sendDataType = false)
}
def verifySimpleTransfer(sendDataType: Boolean): Unit = {
val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator
val dataType = if (sendDataType) TwoColType else null
new StructStreamingResultSender(baseSender, dataType , iter).send()
// println("type: " + collector.getResultType.toString)
assert(TwoColType.equals(collector.getResultType))
val iter2 = collector.getResult
(0 to 9).foreach { i =>
assert(iter2.hasNext)
val o = iter2.next()
assert(o.size == 2)
assert(o(0).asInstanceOf[Int] == i)
assert(o(1).asInstanceOf[String] == i.toString * 5)
}
assert(! iter2.hasNext)
}
/**
* A test iterator that generate integer data
* @param start the 1st value
* @param n number of integers generated
* @param genExcp generate Exception if true. This is used to test exception handling.
*/
def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = {
new Iterator[Array[Object]] {
val max = if (genExcp) start + n else start + n - 1
var index: Int = start - 1
override def hasNext: Boolean = if (index < max) true else false
override def next(): Array[Object] =
if (index < (start + n - 1)) {
index += 1
Array(index.asInstanceOf[Object])
} else throw new RuntimeException("simulated error")
}
}
test("transfer data with 0 row") {
new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send()
// println("type: " + collector.getResultType.toString)
assert(collector.getResultType == null)
val iter = collector.getResult
assert(! iter.hasNext)
}
test("transfer data with 10K rows") {
new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send()
// println("type: " + collector.getResultType.toString)
assert(OneColType.equals(collector.getResultType))
val iter = collector.getResult
// println(iter.toList.map(list => list.mkString(",")).mkString("; "))
(1 to 10000).foreach { i =>
assert(iter.hasNext)
val o = iter.next()
assert(o.size == 1)
assert(o(0).asInstanceOf[Int] == i)
}
assert(! iter.hasNext)
}
test("transfer data with 10K rows with 2 sender") {
baseSender = new LocalResultSender(collector, 2)
val total = 300
val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()}
Await.result(sender1, 1.seconds)
Await.result(sender2, 1.seconds)
// println("type: " + collector.getResultType.toString)
assert(OneColType.equals(collector.getResultType))
val iter = collector.getResult
// println(iter.toList.map(list => list.mkString(",")).mkString("; "))
val set = scala.collection.mutable.Set[Int]()
(1 to total).foreach { i =>
assert(iter.hasNext)
val o = iter.next()
assert(o.size == 1)
assert(! set.contains(o(0).asInstanceOf[Int]))
set.add(o(0).asInstanceOf[Int])
}
assert(! iter.hasNext)
}
test("transfer data with 10K rows with 2 sender with error") {
baseSender = new LocalResultSender(collector, 2)
val total = 1000
val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()}
val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()}
Await.result(sender1, 1 seconds)
Await.result(sender2, 1 seconds)
// println("type: " + collector.getResultType.toString)
assert(OneColType.equals(collector.getResultType))
val iter = collector.getResult
// println(iter.toList.map(list => list.mkString(",")).mkString("; "))
val set = scala.collection.mutable.Set[Int]()
intercept[RuntimeException] {
(1 to total).foreach { i =>
assert(iter.hasNext)
val o = iter.next()
assert(o.size == 1)
assert(! set.contains(o(0).asInstanceOf[Int]))
set.add(o(0).asInstanceOf[Int])
}
}
// println(s"rows received: ${set.size}")
}
test("transfer data with Exception") {
new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send()
// println("type: " + collector.getResultType.toString)
val iter = collector.getResult
intercept[RuntimeException] ( iter.foreach(_.mkString(",")) )
}
def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] =
intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}"))
test("transfer string pair data with 200 rows") {
new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send()
// println("type: " + collector.getResultType.toString)
assert(TwoColType.equals(collector.getResultType))
val iter = collector.getResult
// println(iter.toList.map(list => list.mkString(",")).mkString("; "))
(1 to 1000).foreach { i =>
assert(iter.hasNext)
val o = iter.next()
assert(o.size == 2)
assert(o(0) == s"key-$i")
assert(o(1) == s"value-$i")
}
assert(! iter.hasNext)
}
/**
* Usage notes: There are 3 kinds of data to transfer:
* (1) object, (2) byte array of serialized object, and (3) byte array
* this test shows how to handle all of them.
*/
test("DataSerializer usage") {
val outBuf = new HeapDataOutputStream(1024, null)
val inBuf = new ByteArrayDataInput()
// 1. a regular object
val hello = "Hello World!" * 30
// serialize the data
DataSerializer.writeObject(hello, outBuf)
val bytesHello = outBuf.toByteArray.clone()
// de-serialize the data
inBuf.initialize(bytesHello, Version.CURRENT)
val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
assert(hello == hello2)
// 2. byte array of serialized object
// serialize: byte array from `CachedDeserializable`
val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello)
outBuf.reset()
DataSerializer.writeByteArray(cd.getSerializedValue, outBuf)
// de-serialize the data in 2 steps
inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf)
inBuf.initialize(bytesHello2, Version.CURRENT)
val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object]
assert(hello == hello3)
// 3. byte array
outBuf.reset()
DataSerializer.writeByteArray(bytesHello, outBuf)
inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT)
val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf)
assert(bytesHello sameElements bytesHello3)
}
}