[GEARPUMP-290] Setup FunctionRunner properly
Author: manuzhang <owenzhang1990@gmail.com>
Closes #168 from manuzhang/GEARPUMP-290.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 44d724d..42d50e2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -55,6 +55,7 @@
private val windowFn = groupBy.window.windowFn
private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
+ private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
override def process(message: Message): Unit = {
val input = message.msg.asInstanceOf[IN]
@@ -76,9 +77,9 @@
}
if (!groupedFnRunners.containsKey(group)) {
- val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
- fn.setup()
- groupedFnRunners.put(group, fn)
+ val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
+ groupedFnRunners.put(group, runner)
+ groupedRunnerSetups.put(group, false)
}
def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = {
@@ -114,20 +115,30 @@
if (!time.isBefore(firstWin.endTime)) {
val inputs = windowInputs.remove(firstWin)
if (groupedFnRunners.containsKey(group)) {
- val reduceFn = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
+ val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
(output: OUT) => taskContext.output(Message(output, time)))
+ val setup = groupedRunnerSetups.get(group)
+ if (!setup) {
+ runner.setup()
+ groupedRunnerSetups.put(group, true)
+ }
inputs.forEach(new Procedure[IN] {
override def value(t: IN): Unit = {
// .toList forces eager evaluation
- reduceFn.process(t).toList
+ runner.process(t).toList
}
})
// .toList forces eager evaluation
- reduceFn.finish().toList
+ runner.finish().toList
if (groupBy.window.accumulationMode == Discarding) {
- reduceFn.teardown()
+ runner.teardown()
+ groupedRunnerSetups.put(group, false)
+ // dicarding, setup need to be called for each window
+ onTrigger(group, windowInputs)
+ } else {
+ // accumulating, setup is only called for the first window
+ onTrigger(group, windowInputs)
}
- onTrigger(group, windowInputs)
} else {
throw new RuntimeException(s"FunctionRunner not found for group $group")
}