blob: 5d95cec4226ea175533561eb3ce3cc6d20ea1772 [file] [log] [blame]
package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
import com.gemstone.gemfire.DataSerializer
import com.gemstone.gemfire.cache.execute.ResultCollector
import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl
import com.gemstone.gemfire.cache.query.types.StructType
import com.gemstone.gemfire.distributed.DistributedMember
import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput}
import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions.StructStreamingResultSender.
{TYPE_CHUNK, DATA_CHUNK, ERROR_CHUNK, SER_DATA, UNSER_DATA, BYTEARR_DATA}
/**
* StructStreamingResultCollector and StructStreamingResultSender are paired
* to transfer result of list of `com.gemstone.gemfire.cache.query.Struct`
* from GemFire server to Spark Connector (the client of GemFire server)
* in streaming, i.e., while sender sending the result, the collector can
* start processing the arrived result without waiting for full result to
* become available.
*/
class StructStreamingResultCollector(desc: String) extends ResultCollector[Array[Byte], Iterator[Array[Object]]] {
/** the constructor that provide default `desc` (description) */
def this() = this("StructStreamingResultCollector")
private val queue: BlockingQueue[Array[Byte]] = new LinkedBlockingQueue[Array[Byte]]()
var structType: StructType = null
/** ------------------------------------------ */
/** ResultCollector interface implementations */
/** ------------------------------------------ */
override def getResult: Iterator[Array[Object]] = resultIterator
override def getResult(timeout: Long, unit: TimeUnit): Iterator[Array[Object]] =
throw new UnsupportedOperationException()
/** addResult add non-empty byte array (chunk) to the queue */
override def addResult(memberID: DistributedMember, chunk: Array[Byte]): Unit =
if (chunk != null && chunk.size > 1) {
this.queue.add(chunk)
// println(s"""$desc receive from $memberID: ${chunk.mkString(" ")}""")
}
/** endResults add special `Array.empty` to the queue as marker of end of data */
override def endResults(): Unit = this.queue.add(Array.empty)
override def clearResults(): Unit = this.queue.clear()
/** ------------------------------------------ */
/** Internal methods */
/** ------------------------------------------ */
def getResultType: StructType = {
// trigger lazy resultIterator initialization if necessary
if (structType == null) resultIterator.hasNext
structType
}
/**
* Note: The data is sent in chunks, and each chunk contains multiple
* records. So the result iterator is an iterator (I) of iterator (II),
* i.e., go through each chunk (iterator (I)), and for each chunk, go
* through each record (iterator (II)).
*/
private lazy val resultIterator = new Iterator[Array[Object]] {
private var currentIterator: Iterator[Array[Object]] = nextIterator()
override def hasNext: Boolean = {
if (!currentIterator.hasNext && currentIterator != Iterator.empty) currentIterator = nextIterator()
currentIterator.hasNext
}
/** Note: make sure call `hasNext` first to adjust `currentIterator` */
override def next(): Array[Object] = currentIterator.next()
}
/** get the iterator for the next chunk of data */
private def nextIterator(): Iterator[Array[Object]] = {
val chunk: Array[Byte] = queue.take
if (chunk.isEmpty) {
Iterator.empty
} else {
val input = new ByteArrayDataInput()
input.initialize(chunk, Version.CURRENT)
val chunkType = input.readByte()
// println(s"chunk type $chunkType")
chunkType match {
case TYPE_CHUNK =>
if (structType == null)
structType = DataSerializer.readObject(input).asInstanceOf[StructTypeImpl]
nextIterator()
case DATA_CHUNK =>
// require(structType != null && structType.getFieldNames.length > 0)
if (structType == null) structType = StructStreamingResultSender.KeyValueType
chunkToIterator(input, structType.getFieldNames.length)
case ERROR_CHUNK =>
val error = DataSerializer.readObject(input).asInstanceOf[Exception]
errorPropagationIterator(error)
case _ => throw new RuntimeException(s"unknown chunk type: $chunkType")
}
}
}
/** create a iterator that propagate sender's exception */
private def errorPropagationIterator(ex: Exception) = new Iterator[Array[Object]] {
val re = new RuntimeException(ex)
override def hasNext: Boolean = throw re
override def next(): Array[Object] = throw re
}
/** convert a chunk of data to an iterator */
private def chunkToIterator(input: ByteArrayDataInput, rowSize: Int) = new Iterator[Array[Object]] {
override def hasNext: Boolean = input.available() > 0
val tmpInput = new ByteArrayDataInput()
override def next(): Array[Object] =
(0 until rowSize).map { ignore =>
val b = input.readByte()
b match {
case SER_DATA =>
val arr: Array[Byte] = DataSerializer.readByteArray(input)
tmpInput.initialize(arr, Version.CURRENT)
DataSerializer.readObject(tmpInput).asInstanceOf[Object]
case UNSER_DATA =>
DataSerializer.readObject(input).asInstanceOf[Object]
case BYTEARR_DATA =>
DataSerializer.readByteArray(input).asInstanceOf[Object]
case _ =>
throw new RuntimeException(s"unknown data type $b")
}
}.toArray
}
}