blob: 2c64e199a024631d47df7735f4124061ed3fe8de [file] [log] [blame]
package unittest.io.pivotal.gemfire.spark.connector
import org.apache.spark.SparkConf
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, FunSuite}
import io.pivotal.gemfire.spark.connector._
class GemFireConnectionConfTest extends FunSuite with Matchers with MockitoSugar {
test("apply(SparkConf) w/ GemFireLocator property and empty gemfireProps") {
val (host1, port1) = ("host1", 1234)
val (host2, port2) = ("host2", 5678)
val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]")
val connConf = GemFireConnectionConf(conf)
assert(connConf.locators == Seq((host1, port1),(host2, port2)))
assert(connConf.gemfireProps.isEmpty)
}
test("apply(SparkConf) w/ GemFireLocator property and gemfire properties") {
val (host1, port1) = ("host1", 1234)
val (host2, port2) = ("host2", 5678)
val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
val (propK2, propV2) = ("ack-wait-threshold", "10")
val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]")
.set(s"spark.gemfire.$propK1", propV1).set(s"spark.gemfire.$propK2", propV2)
val connConf = GemFireConnectionConf(conf)
assert(connConf.locators == Seq((host1, port1),(host2, port2)))
assert(connConf.gemfireProps == Map(propK1 -> propV1, propK2 -> propV2))
}
test("apply(SparkConf) w/o GemFireLocator property") {
intercept[RuntimeException] { GemFireConnectionConf(new SparkConf()) }
}
test("apply(SparkConf) w/ invalid GemFireLocator property") {
val conf = new SparkConf().set(GemFireLocatorPropKey, "local^host:1234")
intercept[Exception] { GemFireConnectionConf(conf) }
}
test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non gemfireProps") {
val (host1, port1) = ("host1", 1234)
val connConf = GemFireConnectionConf(s"$host1:$port1")
assert(connConf.locators == Seq((host1, port1)))
assert(connConf.gemfireProps.isEmpty)
}
test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non-empty gemfireProps") {
val (host1, port1) = ("host1", 1234)
val (host2, port2) = ("host2", 5678)
val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
val (propK2, propV2) = ("ack-wait-threshold", "10")
val props = Map(propK1 -> propV1, propK2 -> propV2)
val connConf = GemFireConnectionConf(s"$host1:$port1,$host2:$port2", props)
assert(connConf.locators == Seq((host1, port1),(host2, port2)))
assert(connConf.gemfireProps == props)
}
test("apply(locatorStr, gemfireProps) w/ invalid locatorStr") {
intercept[Exception] { GemFireConnectionConf("local~host:4321") }
}
test("constructor w/ empty (host,port) pairs") {
intercept[IllegalArgumentException] { new GemFireConnectionConf(Seq.empty) }
}
test("getConnection() normal") {
implicit val mockFactory = mock[GemFireConnectionManager]
val mockConnection = mock[GemFireConnection]
when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenReturn(mockConnection)
val connConf = GemFireConnectionConf("localhost:1234")
assert(connConf.getConnection == mockConnection)
verify(mockFactory).getConnection(connConf)
}
test("getConnection() failure") {
implicit val mockFactory = mock[GemFireConnectionManager]
when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenThrow(new RuntimeException)
val connConf = GemFireConnectionConf("localhost:1234")
intercept[RuntimeException] { connConf.getConnection }
verify(mockFactory).getConnection(connConf)
}
}