[GEARPUMP-290] Setup FunctionRunner properly

Author: manuzhang <owenzhang1990@gmail.com>

Closes #168 from manuzhang/GEARPUMP-290.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 44d724d..42d50e2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -55,6 +55,7 @@
   private val windowFn = groupBy.window.windowFn
   private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
   private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
+  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
 
   override def process(message: Message): Unit = {
     val input = message.msg.asInstanceOf[IN]
@@ -76,9 +77,9 @@
     }
 
     if (!groupedFnRunners.containsKey(group)) {
-      val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
-      fn.setup()
-      groupedFnRunners.put(group, fn)
+      val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
+      groupedFnRunners.put(group, runner)
+      groupedRunnerSetups.put(group, false)
     }
 
     def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = {
@@ -114,20 +115,30 @@
         if (!time.isBefore(firstWin.endTime)) {
           val inputs = windowInputs.remove(firstWin)
           if (groupedFnRunners.containsKey(group)) {
-            val reduceFn = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
+            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
               (output: OUT) => taskContext.output(Message(output, time)))
+            val setup = groupedRunnerSetups.get(group)
+            if (!setup) {
+              runner.setup()
+              groupedRunnerSetups.put(group, true)
+            }
             inputs.forEach(new Procedure[IN] {
               override def value(t: IN): Unit = {
                 // .toList forces eager evaluation
-                reduceFn.process(t).toList
+                runner.process(t).toList
               }
             })
             // .toList forces eager evaluation
-            reduceFn.finish().toList
+            runner.finish().toList
             if (groupBy.window.accumulationMode == Discarding) {
-              reduceFn.teardown()
+              runner.teardown()
+              groupedRunnerSetups.put(group, false)
+              // dicarding, setup need to be called for each window
+              onTrigger(group, windowInputs)
+            } else {
+              // accumulating, setup is only called for the first window
+              onTrigger(group, windowInputs)
             }
-            onTrigger(group, windowInputs)
           } else {
             throw new RuntimeException(s"FunctionRunner not found for group $group")
           }