[GEARPUMP-249] Force eager evaluation on chained operations
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [x] Make sure the commit message is formatted like:
`[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
- [x] Make sure tests pass via `sbt clean test`.
- [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.
Author: manuzhang <owenzhang1990@gmail.com>
Closes #127 from manuzhang/eager-eval.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index b3ecf2d..d87a9e4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -92,10 +92,12 @@
.andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
inputs.forEach(new Procedure[IN] {
override def value(t: IN): Unit = {
- reduceFn.process(t)
+ // .toList forces eager evaluation
+ reduceFn.process(t).toList
}
})
- reduceFn.finish()
+ // .toList forces eager evaluation
+ reduceFn.finish().toList
if (groupBy.window.accumulationMode == Discarding) {
reduceFn.clearState()
}