[GEARPUMP-308] Fix TransformTask output time

Author: manuzhang <owenzhang1990@gmail.com>

Closes #184 from manuzhang/fix_source_output_time.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 9c5e347..82d6beb 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -108,7 +108,11 @@
    * @return a new stream after fold
    */
   def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
-    transform(new FoldRunner(fn, description))
+    if (graph.vertices.exists(_.isInstanceOf[GroupByOp[_, _]])) {
+      transform(new FoldRunner(fn, description))
+    } else {
+      throw new UnsupportedOperationException("fold operation can only be applied on window")
+    }
   }
 
   /**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 9571697..572df94 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -29,7 +29,7 @@
 object TransformTask {
 
   class Transform[IN, OUT](taskContext: TaskContext,
-      operator: Option[FunctionRunner[IN, OUT]],
+      processor: Option[FunctionRunner[IN, OUT]],
       private var buffer: Vector[Message] = Vector.empty[Message]) {
 
     def onNext(msg: Message): Unit = {
@@ -37,27 +37,23 @@
     }
 
     def onWatermarkProgress(watermark: Instant): Unit = {
-      val watermarkTime = watermark.toEpochMilli
       var nextBuffer = Vector.empty[Message]
-      val processor = operator.map(FunctionRunner.withEmitFn(_,
-        (out: OUT) => taskContext.output(Message(out, watermarkTime))))
       processor.foreach(_.setup())
-      buffer.foreach {
-        message: Message =>
-          if (message.timestamp.toEpochMilli < watermarkTime) {
-            processor match {
-              case Some(p) =>
+      buffer.foreach { message: Message =>
+        if (message.timestamp.isBefore(watermark)) {
+          processor match {
+            case Some(p) =>
+              FunctionRunner
+                .withEmitFn(p, (out: OUT) => taskContext.output(Message(out, message.timestamp)))
                 // .toList forces eager evaluation
-                p.process(message.value.asInstanceOf[IN]).toList
-              case None =>
-                taskContext.output(Message(message.value, watermarkTime))
-            }
-          } else {
-            nextBuffer +:= message
+                .process(message.value.asInstanceOf[IN]).toList
+            case None =>
+              taskContext.output(message)
           }
+        } else {
+          nextBuffer +:= message
+        }
       }
-      // .toList forces eager evaluation
-      processor.map(_.finish().toList)
       processor.foreach(_.teardown())
       buffer = nextBuffer
     }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 74749b9..f392f70 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -116,7 +116,9 @@
           val inputs = windowInputs.remove(firstWin)
           if (groupedFnRunners.containsKey(group)) {
             val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
-              (output: OUT) => taskContext.output(Message(output, time)))
+              (output: OUT) => {
+                taskContext.output(Message(output, time))
+              })
             val setup = groupedRunnerSetups.get(group)
             if (!setup) {
               runner.setup()
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 281d69a..481925a 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -69,7 +69,7 @@
         task.onWatermarkProgress(Watermark.MAX)
 
         msgs.foreach { msg =>
-          verify(taskContext).output(MockitoMatchers.eq(Message(msg.value, Watermark.MAX)))
+          verify(taskContext).output(MockitoMatchers.eq(msg))
         }
     }
   }
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
new file mode 100644
index 0000000..fbbee3e
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.dsl.plan.functions.FoldRunner
+import org.apache.gearpump.streaming.dsl.window.api.SessionWindows
+import org.apache.gearpump.streaming.source.Watermark
+import org.mockito.Mockito.{times, verify}
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+
+class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("DefaultWindowRunner should handle SessionWindow") {
+
+    val data = List(
+      Message(("foo", 1L), Instant.ofEpochMilli(1L)),
+      Message(("bar", 1L), Instant.ofEpochMilli(8L)),
+      Message(("foo", 1L), Instant.ofEpochMilli(15L)),
+      Message(("bar", 1L), Instant.ofEpochMilli(17L)),
+      Message(("bar", 1L), Instant.ofEpochMilli(18L)),
+      Message(("foo", 1L), Instant.ofEpochMilli(25L)),
+      Message(("foo", 1L), Instant.ofEpochMilli(26L)),
+      Message(("bar", 1L), Instant.ofEpochMilli(30L)),
+      Message(("bar", 1L), Instant.ofEpochMilli(31L))
+    )
+
+    type KV = (String, Long)
+    val taskContext = MockUtil.mockTaskContext
+    implicit val system = MockUtil.system
+    val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
+    val operator = new FoldRunner(reduce, "reduce")
+    val userConfig = UserConfig.empty.withValue(
+      Constants.GEARPUMP_STREAMING_OPERATOR, operator)
+    val windows = SessionWindows.apply[KV](Duration.ofMillis(4L))
+    val groupBy = GroupAlsoByWindow[KV, String](_._1, windows)
+    val windowRunner = new DefaultWindowRunner(taskContext, userConfig, groupBy)
+
+    data.foreach(windowRunner.process)
+    windowRunner.trigger(Watermark.MAX)
+
+    verify(taskContext, times(2)).output(Message(Some(("foo", 1)), Watermark.MAX))
+    verify(taskContext).output(Message(Some(("foo", 2)), Watermark.MAX))
+    verify(taskContext, times(2)).output(Message(Some(("bar", 2)), Watermark.MAX))
+    verify(taskContext).output(Message(Some(("bar", 1)), Watermark.MAX))
+  }
+
+}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 7db9b15..7651251 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -56,8 +56,8 @@
   }
 
   property("DataSourceTask should read from DataSource and transform inputs") {
-    forAll(runnerGen, Gen.alphaStr) {
-      (runner: Option[FunctionRunner[Any, Any]], str: String) =>
+    forAll(runnerGen, Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+      (runner: Option[FunctionRunner[Any, Any]], str: String, timestamp: Instant) =>
         val taskContext = MockUtil.mockTaskContext
         implicit val system = MockUtil.system
         val dataSource = mock[DataSource]
@@ -65,7 +65,7 @@
           .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
         val transform = new Transform[Any, Any](taskContext, runner)
         val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
-        val msg = Message(str)
+        val msg = Message(str, timestamp)
         when(dataSource.read()).thenReturn(msg)
         runner.foreach(r => {
           when(r.process(str)).thenReturn(Some(str))
@@ -75,7 +75,7 @@
         sourceTask.onNext(Message("next"))
         sourceTask.onWatermarkProgress(Watermark.MAX)
 
-        verify(taskContext).output(Message(str, Watermark.MAX))
+        verify(taskContext).output(msg)
     }
   }