blob: 20f753e1d8fecc0b892c85b7e99c1815b65d17e6 [file] [log] [blame]
package unittest.io.pivotal.gemfire.spark.connector.rdd
import com.gemstone.gemfire.distributed.internal.ServerLocation
import io.pivotal.gemfire.spark.connector.internal.RegionMetadata
import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._
import io.pivotal.gemfire.spark.connector.GemFireConnection
import io.pivotal.gemfire.spark.connector.internal.rdd._
import org.apache.spark.Partition
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, FunSuite}
import java.util.{HashSet => JHashSet, HashMap => JHashMap}
import scala.collection.mutable
class GemFireRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar {
val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new JHashMap()
def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): JHashMap[ServerLocation, JHashSet[Integer]] = {
import scala.collection.JavaConversions._
val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, port), set.map(Integer.valueOf))}
(new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, (s, jset)) => acc.put(s, new JHashSet(jset)); acc }
}
val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map(
("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5))
// update this test whenever change default setting
test("default partitioned region partitioner") {
assert(GemFireRDDPartitioner.defaultPartitionedRegionPartitioner === ServerSplitsPartitioner)
}
// update this test whenever change default setting
test("default replicated region partitioner") {
assert(GemFireRDDPartitioner.defaultReplicatedRegionPartitioner === OnePartitionPartitioner)
}
test("GemFireRDDPartitioner.apply method") {
import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._
for ((name, partitioner) <- partitioners) assert(GemFireRDDPartitioner(name) == partitioner)
assert(GemFireRDDPartitioner("dummy") == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner)
assert(GemFireRDDPartitioner() == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner)
}
test("OnePartitionPartitioner") {
val mockConnection = mock[GemFireConnection]
val partitions = OnePartitionPartitioner.partitions[String, String](mockConnection, null, Map.empty)
verifySinglePartition(partitions)
}
def verifySinglePartition(partitions: Array[Partition]): Unit = {
assert(1 == partitions.size)
assert(partitions(0).index === 0)
assert(partitions(0).isInstanceOf[GemFireRDDPartition])
assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty)
}
test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") {
val map: List[(String, mutable.Set[Int])] = List(
"server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1)
verifyPartitions(partitions, List(
(Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2"))))
}
test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") {
val map: List[(String, mutable.Set[Int])] = List(
"server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1)
verifyPartitions(partitions, List(
(Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2"))))
}
test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") {
val map: List[(String, mutable.Set[Int])] = List(
"server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
verifyPartitions(partitions, List(
(Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2"))))
}
test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") {
val map: List[(String, mutable.Set[Int])] = List(
"server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1)
verifyPartitions(partitions, List(
(Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), Seq("server2"))))
}
test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, non-continuous IDs") {
val map: List[(String, mutable.Set[Int])] = List(
"server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7))
val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
verifyPartitions(partitions, List(
(Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2"))))
}
test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 servers have 1, 2, and 3 buckets") {
val map: List[(String, mutable.Set[Int])] = List(
"s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> mutable.Set(3, 4, 5))
val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2)
// partitions.foreach(println)
verifyPartitions(partitions, List(
(Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3"))))
}
test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 servers have 0, 2, 3, and 4 buckets") {
val map: List[(String, mutable.Set[Int])] = List(
"s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8))
val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3)
// partitions.foreach(println)
verifyPartitions(partitions, List(
(Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), Seq("s2")), (Set(4), Seq("s2")),
(Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) ))
}
test("ServerSplitsPartitioner.partitions(): metadata = None ") {
val regionPath = "test"
val mockConnection = mock[GemFireConnection]
intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, String](mockConnection, null, Map.empty) }
}
test("ServerSplitsPartitioner.partitions(): replicated region ") {
val regionPath = "test"
val mockConnection = mock[GemFireConnection]
val md = new RegionMetadata(regionPath, false, 11, null)
when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty)
verifySinglePartition(partitions)
}
test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") {
val regionPath = "test"
val mockConnection = mock[GemFireConnection]
val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap)
when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty)
verifySinglePartition(partitions)
}
test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data ") {
import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey
val regionPath = "test"
val mockConnection = mock[GemFireConnection]
val map: Map[(String, Int), Set[Int]] = Map(
("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), ("s3",4) -> Set(3, 4, 5))
val md = new RegionMetadata(regionPath, true, 6, toJavaServerBucketMap(map))
when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2"))
// partitions.foreach(println)
verifyPartitions(partitions, List(
(Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3"))))
}
// Note: since the order of partitions is not pre-determined, we have to verify partition id
// and contents separately
def verifyPartitions(partitions: Array[Partition], expPartitions: List[(Set[Int], Seq[String])]): Unit = {
// 1. check size
assert(partitions.size == expPartitions.size)
// 2. check IDs are 0 to n-1
(0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => assert(id == p.index) }
// 3. get all pairs of bucket set and its locations, and compare to the expected pairs
val list = partitions.map { e =>
val p = e.asInstanceOf[GemFireRDDPartition]
(p.bucketSet, p.locations)
}
expPartitions.foreach(e => assert(list.contains(e)))
}
}