blob: e31186b35c384aad4e49fb94091c7b5e1d56019f [file] [log] [blame]
package io.pivotal.gemfire.spark.connector.internal
import com.gemstone.gemfire.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut}
import com.gemstone.gemfire.cache.execute.{FunctionException, FunctionService}
import com.gemstone.gemfire.cache.query.Query
import com.gemstone.gemfire.cache.{Region, RegionService}
import com.gemstone.gemfire.internal.cache.execute.InternalExecution
import io.pivotal.gemfire.spark.connector.internal.oql.QueryResultCollector
import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartition
import org.apache.spark.Logging
import io.pivotal.gemfire.spark.connector.GemFireConnection
import io.pivotal.gemfire.spark.connector.internal.gemfirefunctions._
import java.util.{Set => JSet, List => JList }
/**
* Default GemFireConnection implementation. The instance of this should be
* created by DefaultGemFireConnectionFactory
* @param locators pairs of host/port of locators
* @param gemFireProps The initial gemfire properties to be used.
*/
private[connector] class DefaultGemFireConnection (
locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty)
extends GemFireConnection with Logging {
private val clientCache = initClientCache()
/** a lock object only used by getRegionProxy...() */
private val regionLock = new Object
/** Register GemFire functions to the GemFire cluster */
FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance())
FunctionService.registerFunction(RetrieveRegionFunction.getInstance())
private def initClientCache() : ClientCache = {
try {
import io.pivotal.gemfire.spark.connector.map2Properties
logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""")
val ccf = new ClientCacheFactory(gemFireProps)
locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) }
ccf.create()
} catch {
case e: Exception =>
logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""")
throw new RuntimeException(e)
}
}
/** close the clientCache */
override def close(): Unit =
if (! clientCache.isClosed) clientCache.close()
/** ----------------------------------------- */
/** implementation of GemFireConnection trait */
/** ----------------------------------------- */
override def getQuery(queryString: String): Query =
clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString)
override def validateRegion[K, V](regionPath: String): Unit = {
val md = getRegionMetadata[K, V](regionPath)
if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found")
}
def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = {
import scala.collection.JavaConversions.setAsJavaSet
val region = getRegionProxy[K, V](regionPath)
val set0: JSet[Integer] = Set[Integer](0)
val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0)
exec.setWaitOnExceptionFlag(true)
try {
val collector = exec.execute(RetrieveRegionMetadataFunction.ID)
val r = collector.getResult.asInstanceOf[JList[RegionMetadata]]
logDebug(r.get(0).toString)
Some(r.get(0))
} catch {
case e: FunctionException =>
if (e.getMessage.contains(s"The region named /$regionPath was not found")) None
else throw e
}
}
def getRegionProxy[K, V](regionPath: String): Region[K, V] = {
val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
if (region1 != null) region1
else regionLock.synchronized {
val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
if (region2 != null) region2
else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath)
}
}
override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GemFireRDDPartition): Iterator[(K, V)] = {
val region = getRegionProxy[K, V](regionPath)
val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})"""
val args : Array[String] = Array[String](whereClause.getOrElse(""), desc)
val collector = new StructStreamingResultCollector(desc)
// RetrieveRegionResultCollector[(K, V)]
import scala.collection.JavaConversions.setAsJavaSet
val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution]
.withBucketFilter(split.bucketSet.map(Integer.valueOf))
exec.setWaitOnExceptionFlag(true)
exec.execute(RetrieveRegionFunction.ID)
collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])}
}
override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = {
import scala.collection.JavaConversions.setAsJavaSet
FunctionService.registerFunction(QueryFunction.getInstance())
val collector = new QueryResultCollector
val region = getRegionProxy(regionPath)
val args: Array[String] = Array[String](queryString, bucketSet.toString)
val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution]
.withBucketFilter(bucketSet.map(Integer.valueOf))
.withArgs(args)
exec.execute(QueryFunction.ID)
collector.getResult
}
}
/** The purpose of this class is making unit test DefaultGemFireConnectionManager easier */
class DefaultGemFireConnectionFactory {
def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) =
new DefaultGemFireConnection(locators, gemFireProps)
}