[GEARPUMP-307] Fix TransformTask$Transform invocation

Author: manuzhang <owenzhang1990@gmail.com>

Closes #182 from manuzhang/fix_transform.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index ed48dc7..86ac933 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -32,25 +32,18 @@
       operator: Option[FunctionRunner[IN, OUT]],
       private var buffer: Vector[Message] = Vector.empty[Message]) {
 
-    def onStart(startTime: Instant): Unit = {
-      operator.foreach(_.setup())
-    }
-
     def onNext(msg: Message): Unit = {
       buffer +:= msg
     }
 
-    def onStop(): Unit = {
-      operator.foreach(_.teardown())
-    }
-
     def onWatermarkProgress(watermark: Instant): Unit = {
       val watermarkTime = watermark.toEpochMilli
       var nextBuffer = Vector.empty[Message]
       val processor = operator.map(FunctionRunner.withEmitFn(_,
         (out: OUT) => taskContext.output(Message(out, watermarkTime))))
+      processor.foreach(_.setup())
       buffer.foreach { case message@Message(in, time) =>
-        if (time <= watermarkTime) {
+        if (time < watermarkTime) {
           processor match {
             case Some(p) =>
               // .toList forces eager evaluation
@@ -63,7 +56,8 @@
         }
       }
       // .toList forces eager evaluation
-      processor.map(_.finish())
+      processor.map(_.finish().toList)
+      processor.foreach(_.teardown())
       buffer = nextBuffer
     }
   }
@@ -78,18 +72,10 @@
       GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf)
   }
 
-  override def onStart(startTime: Instant): Unit = {
-    transform.onStart(startTime)
-  }
-
   override def onNext(msg: Message): Unit = {
     transform.onNext(msg)
   }
 
-  override def onStop(): Unit = {
-    transform.onStop()
-  }
-
   override def onWatermarkProgress(watermark: Instant): Unit = {
     transform.onWatermarkProgress(watermark)
   }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 3fceb1a..ff1b2d4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -59,7 +59,6 @@
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
-    transform.onStart(startTime)
 
     self ! Watermark(source.getWatermark)
   }
@@ -78,7 +77,6 @@
 
   override def onStop(): Unit = {
     LOG.info("closing data source...")
-    transform.onStop()
     source.close()
   }
 
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 67fa375..f0bccd7 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -25,7 +25,8 @@
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
 import org.apache.gearpump.streaming.source.Watermark
-import org.mockito.Mockito.{verify, when}
+import org.mockito.{Matchers => MockitoMatchers}
+import org.mockito.Mockito.{times, verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.mock.MockitoSugar
@@ -33,50 +34,44 @@
 
 class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  property("TransformTask should setup functions") {
-    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
-      val taskContext = MockUtil.mockTaskContext
-      implicit val system = MockUtil.system
-      val config = UserConfig.empty
-      val operator = mock[FunctionRunner[Any, Any]]
-      val transform = new Transform[Any, Any](taskContext, Some(operator))
-      val sourceTask = new TransformTask[Any, Any](transform, taskContext, config)
+  private val timeGen = Gen.chooseNum[Long](Watermark.MIN.toEpochMilli,
+    Watermark.MAX.toEpochMilli - 1).map(Instant.ofEpochMilli)
+  private val runnerGen = {
+    val runner = mock[FunctionRunner[Any, Any]]
+    Gen.oneOf(Some(runner), None)
+  }
 
-      sourceTask.onStart(startTime)
+  property("TransformTask should emit on watermark") {
+    val msgGen = for {
+      str <- Gen.alphaStr.suchThat(!_.isEmpty)
+      t <- timeGen
+    } yield Message(s"$str:$t", t)
+    val msgsGen = Gen.listOfN(10, msgGen)
 
-      verify(operator).setup()
+    forAll(runnerGen, msgsGen) {
+      (runner: Option[FunctionRunner[Any, Any]], msgs: List[Message]) =>
+        val taskContext = MockUtil.mockTaskContext
+        implicit val system = MockUtil.system
+        val config = UserConfig.empty
+        val transform = new Transform[Any, Any](taskContext, runner)
+        val task = new TransformTask[Any, Any](transform, taskContext, config)
+
+        msgs.foreach(task.onNext)
+
+        runner.foreach(r => when(r.finish()).thenReturn(None))
+        task.onWatermarkProgress(Watermark.MIN)
+        verify(taskContext, times(0)).output(MockitoMatchers.any[Message])
+
+        msgs.foreach { msg =>
+          runner.foreach(r =>
+            when(r.process(msg.msg)).thenReturn(Some(msg.msg)))
+        }
+        task.onWatermarkProgress(Watermark.MAX)
+
+        msgs.foreach { msg =>
+          verify(taskContext).output(MockitoMatchers.eq(Message(msg.msg, Watermark.MAX)))
+        }
     }
   }
 
-  property("TransformTask should process inputs") {
-    forAll(Gen.alphaStr) { (str: String) =>
-      val taskContext = MockUtil.mockTaskContext
-      implicit val system = MockUtil.system
-      val config = UserConfig.empty
-      val operator = mock[FunctionRunner[Any, Any]]
-      val transform = new Transform[Any, Any](taskContext, Some(operator))
-      val task = new TransformTask[Any, Any](transform, taskContext, config)
-      val msg = Message(str)
-      when(operator.process(str)).thenReturn(Some(str))
-      when(operator.finish()).thenReturn(None)
-
-      task.onNext(msg)
-      task.onWatermarkProgress(Watermark.MAX)
-
-      verify(taskContext).output(Message(str, Watermark.MAX))
-    }
-  }
-
-  property("TransformTask should teardown functions") {
-    val taskContext = MockUtil.mockTaskContext
-    implicit val system = MockUtil.system
-    val config = UserConfig.empty
-    val operator = mock[FunctionRunner[Any, Any]]
-    val transform = new Transform[Any, Any](taskContext, Some(operator))
-    val task = new TransformTask[Any, Any](transform, taskContext, config)
-
-    task.onStop()
-
-    verify(operator).teardown()
-  }
 }
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 3c44c4c..7db9b15 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -33,55 +33,65 @@
 
 class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  property("DataSourceTask should setup data source and Transform") {
-    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
+  private val runnerGen = {
+    val runner = mock[FunctionRunner[Any, Any]]
+    Gen.oneOf(Some(runner), None)
+  }
+
+  property("DataSourceTask should setup data source") {
+    forAll(runnerGen, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+      (runner: Option[FunctionRunner[Any, Any]], startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-      val operator = mock[FunctionRunner[Any, Any]]
-      val transform = new Transform[Any, Any](taskContext, Some(operator))
+      val transform = new Transform[Any, Any](taskContext, runner)
       val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
 
       sourceTask.onStart(startTime)
 
       verify(dataSource).open(taskContext, startTime)
-      verify(operator).setup()
     }
   }
 
   property("DataSourceTask should read from DataSource and transform inputs") {
-    forAll(Gen.alphaStr) { (str: String) =>
+    forAll(runnerGen, Gen.alphaStr) {
+      (runner: Option[FunctionRunner[Any, Any]], str: String) =>
+        val taskContext = MockUtil.mockTaskContext
+        implicit val system = MockUtil.system
+        val dataSource = mock[DataSource]
+        val config = UserConfig.empty
+          .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+        val transform = new Transform[Any, Any](taskContext, runner)
+        val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+        val msg = Message(str)
+        when(dataSource.read()).thenReturn(msg)
+        runner.foreach(r => {
+          when(r.process(str)).thenReturn(Some(str))
+          when(r.finish()).thenReturn(None)
+        })
+
+        sourceTask.onNext(Message("next"))
+        sourceTask.onWatermarkProgress(Watermark.MAX)
+
+        verify(taskContext).output(Message(str, Watermark.MAX))
+    }
+  }
+
+  property("DataSourceTask should teardown DataSource") {
+    forAll(runnerGen) { (runner: Option[FunctionRunner[Any, Any]]) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-      val transform = new Transform[Any, Any](taskContext, None)
+      val transform = new Transform[Any, Any](taskContext, runner)
       val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
-      val msg = Message(str)
-      when(dataSource.read()).thenReturn(msg)
 
-      sourceTask.onNext(Message("next"))
-      sourceTask.onWatermarkProgress(Watermark.MAX)
-      verify(taskContext).output(Message(str, Watermark.MAX))
+      sourceTask.onStop()
+
+      verify(dataSource).close()
     }
   }
-
-  property("DataSourceTask should teardown DataSource and Transform") {
-    val taskContext = MockUtil.mockTaskContext
-    implicit val system = MockUtil.system
-    val dataSource = mock[DataSource]
-    val config = UserConfig.empty
-      .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-    val operator = mock[FunctionRunner[Any, Any]]
-    val transform = new Transform[Any, Any](taskContext, Some(operator))
-    val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
-
-    sourceTask.onStop()
-
-    verify(dataSource).close()
-    verify(operator).teardown()
-  }
 }