| package unittest.io.pivotal.gemfire.spark.connector |
| |
| import com.gemstone.gemfire.cache.Region |
| import io.pivotal.gemfire.spark.connector.{GemFireConnection, GemFireConnectionConf} |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.streaming.dstream.DStream |
| import org.mockito.Mockito._ |
| import org.scalatest.mock.MockitoSugar |
| import org.scalatest.{Matchers, FunSuite} |
| import org.mockito.Matchers.{eq => mockEq, any => mockAny} |
| |
| import scala.reflect.ClassTag |
| |
| class GemFireDStreamFunctionsTest extends FunSuite with Matchers with MockitoSugar { |
| |
| test("test GemFirePairDStreamFunctions Implicit") { |
| import io.pivotal.gemfire.spark.connector.streaming._ |
| val mockDStream = mock[DStream[(Int, String)]] |
| // the implicit make the following line valid |
| val pairDStream: GemFirePairDStreamFunctions[Int, String] = mockDStream |
| pairDStream shouldBe a[GemFirePairDStreamFunctions[_, _]] |
| } |
| |
| test("test GemFireDStreamFunctions Implicit") { |
| import io.pivotal.gemfire.spark.connector.streaming._ |
| val mockDStream = mock[DStream[String]] |
| // the implicit make the following line valid |
| val dstream: GemFireDStreamFunctions[String] = mockDStream |
| dstream shouldBe a[GemFireDStreamFunctions[_]] |
| } |
| |
| def createMocks[K, V](regionPath: String) |
| (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]) |
| : (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = { |
| val mockConnection = mock[GemFireConnection] |
| val mockConnConf = mock[GemFireConnectionConf] |
| val mockRegion = mock[Region[K, V]] |
| when(mockConnConf.getConnection).thenReturn(mockConnection) |
| when(mockConnConf.locators).thenReturn(Seq.empty) |
| (regionPath, mockConnConf, mockConnection, mockRegion) |
| } |
| |
| test("test GemFirePairDStreamFunctions.saveToGemfire()") { |
| import io.pivotal.gemfire.spark.connector.streaming._ |
| val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") |
| val mockDStream = mock[DStream[(String, String)]] |
| mockDStream.saveToGemfire(regionPath, mockConnConf) |
| verify(mockConnConf).getConnection |
| verify(mockConnection).validateRegion[String, String](regionPath) |
| verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit]) |
| } |
| |
| test("test GemFireDStreamFunctions.saveToGemfire()") { |
| import io.pivotal.gemfire.spark.connector.streaming._ |
| val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, Int]("test") |
| val mockDStream = mock[DStream[String]] |
| mockDStream.saveToGemfire[String, Int](regionPath, (s: String) => (s, s.length), mockConnConf) |
| verify(mockConnConf).getConnection |
| verify(mockConnection).validateRegion[String, String](regionPath) |
| verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit]) |
| } |
| |
| } |