blob: 0c9c34f9f69c50e80987e172485f369a0cda1a0d [file] [log] [blame]
package io.pivotal.gemfire.spark.connector.internal.rdd
import io.pivotal.gemfire.spark.connector.GemFireConnection
import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey
import org.apache.spark.Partition
import scala.collection.JavaConversions._
import scala.collection.immutable.SortedSet
import scala.collection.mutable
import scala.reflect.ClassTag
/** This partitioner maps whole region to one GemFireRDDPartition */
object OnePartitionPartitioner extends GemFireRDDPartitioner {
override val name = "OnePartition"
override def partitions[K: ClassTag, V: ClassTag]
(conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] =
Array[Partition](new GemFireRDDPartition(0, Set.empty))
}
/**
* This partitioner maps whole region to N * M GemFire RDD partitions, where M is the number of
* GemFire servers that contain the data for the given region. Th default value of N is 1.
*/
object ServerSplitsPartitioner extends GemFireRDDPartitioner {
override val name = "ServerSplits"
override def partitions[K: ClassTag, V: ClassTag]
(conn: GemFireConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = {
if (md == null) throw new RuntimeException("RegionMetadata is null")
val n = env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt
if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty)
Array[Partition](new GemFireRDDPartition(0, Set.empty))
else {
val map = mapAsScalaMap(md.getServerBucketMap)
.map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList
.map { case (srv, set) => (srv.getHostName, set) }
doPartitions(map, md.getTotalBuckets, n)
}
}
/** Converts server to bucket ID set list to array of RDD partitions */
def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], totalBuckets: Int, n: Int)
: Array[Partition] = {
// method that calculates the group size for splitting "k" items into "g" groups
def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt
// 1. convert list of server and bucket set pairs to a list of server and sorted bucket set pairs
val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, SortedSet[Int]() ++ set) }
// 2. split bucket set of each server into n splits if possible, and server to Seq(server)
val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) =>
if (set.isEmpty) Nil else set.grouped(groupSize(set.size, n)).toList.map(s => (Seq(host), s)) }
// 3. calculate empty bucket IDs by removing all bucket sets of all servers from the full bucket sets
val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2}
// 4. distribute empty bucket IDs to all partitions evenly.
// The empty buckets do not contain data when partitions are created, but they may contain data
// when RDD is materialized, so need to include those bucket IDs in the partitions.
val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet
else srvToSplitedBuckeSet.zipAll(
emptyIDs.grouped(groupSize(emptyIDs.size, srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map
{ case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 ++ set2) }
// 5. create array of partitions w/ 0-based index
(0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map
{ case (i, (srv, set)) => new GemFireRDDPartition(i, set, srv) }.toArray
}
}