[GEARPUMP-308] Fix TransformTask output time
Author: manuzhang <owenzhang1990@gmail.com>
Closes #184 from manuzhang/fix_source_output_time.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 9c5e347..82d6beb 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -108,7 +108,11 @@
* @return a new stream after fold
*/
def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
- transform(new FoldRunner(fn, description))
+ if (graph.vertices.exists(_.isInstanceOf[GroupByOp[_, _]])) {
+ transform(new FoldRunner(fn, description))
+ } else {
+ throw new UnsupportedOperationException("fold operation can only be applied on window")
+ }
}
/**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 9571697..572df94 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -29,7 +29,7 @@
object TransformTask {
class Transform[IN, OUT](taskContext: TaskContext,
- operator: Option[FunctionRunner[IN, OUT]],
+ processor: Option[FunctionRunner[IN, OUT]],
private var buffer: Vector[Message] = Vector.empty[Message]) {
def onNext(msg: Message): Unit = {
@@ -37,27 +37,23 @@
}
def onWatermarkProgress(watermark: Instant): Unit = {
- val watermarkTime = watermark.toEpochMilli
var nextBuffer = Vector.empty[Message]
- val processor = operator.map(FunctionRunner.withEmitFn(_,
- (out: OUT) => taskContext.output(Message(out, watermarkTime))))
processor.foreach(_.setup())
- buffer.foreach {
- message: Message =>
- if (message.timestamp.toEpochMilli < watermarkTime) {
- processor match {
- case Some(p) =>
+ buffer.foreach { message: Message =>
+ if (message.timestamp.isBefore(watermark)) {
+ processor match {
+ case Some(p) =>
+ FunctionRunner
+ .withEmitFn(p, (out: OUT) => taskContext.output(Message(out, message.timestamp)))
// .toList forces eager evaluation
- p.process(message.value.asInstanceOf[IN]).toList
- case None =>
- taskContext.output(Message(message.value, watermarkTime))
- }
- } else {
- nextBuffer +:= message
+ .process(message.value.asInstanceOf[IN]).toList
+ case None =>
+ taskContext.output(message)
}
+ } else {
+ nextBuffer +:= message
+ }
}
- // .toList forces eager evaluation
- processor.map(_.finish().toList)
processor.foreach(_.teardown())
buffer = nextBuffer
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 74749b9..f392f70 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -116,7 +116,9 @@
val inputs = windowInputs.remove(firstWin)
if (groupedFnRunners.containsKey(group)) {
val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
- (output: OUT) => taskContext.output(Message(output, time)))
+ (output: OUT) => {
+ taskContext.output(Message(output, time))
+ })
val setup = groupedRunnerSetups.get(group)
if (!setup) {
runner.setup()
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 281d69a..481925a 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -69,7 +69,7 @@
task.onWatermarkProgress(Watermark.MAX)
msgs.foreach { msg =>
- verify(taskContext).output(MockitoMatchers.eq(Message(msg.value, Watermark.MAX)))
+ verify(taskContext).output(MockitoMatchers.eq(msg))
}
}
}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
new file mode 100644
index 0000000..fbbee3e
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.dsl.plan.functions.FoldRunner
+import org.apache.gearpump.streaming.dsl.window.api.SessionWindows
+import org.apache.gearpump.streaming.source.Watermark
+import org.mockito.Mockito.{times, verify}
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+
+class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks
+ with Matchers with MockitoSugar {
+
+ property("DefaultWindowRunner should handle SessionWindow") {
+
+ val data = List(
+ Message(("foo", 1L), Instant.ofEpochMilli(1L)),
+ Message(("bar", 1L), Instant.ofEpochMilli(8L)),
+ Message(("foo", 1L), Instant.ofEpochMilli(15L)),
+ Message(("bar", 1L), Instant.ofEpochMilli(17L)),
+ Message(("bar", 1L), Instant.ofEpochMilli(18L)),
+ Message(("foo", 1L), Instant.ofEpochMilli(25L)),
+ Message(("foo", 1L), Instant.ofEpochMilli(26L)),
+ Message(("bar", 1L), Instant.ofEpochMilli(30L)),
+ Message(("bar", 1L), Instant.ofEpochMilli(31L))
+ )
+
+ type KV = (String, Long)
+ val taskContext = MockUtil.mockTaskContext
+ implicit val system = MockUtil.system
+ val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
+ val operator = new FoldRunner(reduce, "reduce")
+ val userConfig = UserConfig.empty.withValue(
+ Constants.GEARPUMP_STREAMING_OPERATOR, operator)
+ val windows = SessionWindows.apply[KV](Duration.ofMillis(4L))
+ val groupBy = GroupAlsoByWindow[KV, String](_._1, windows)
+ val windowRunner = new DefaultWindowRunner(taskContext, userConfig, groupBy)
+
+ data.foreach(windowRunner.process)
+ windowRunner.trigger(Watermark.MAX)
+
+ verify(taskContext, times(2)).output(Message(Some(("foo", 1)), Watermark.MAX))
+ verify(taskContext).output(Message(Some(("foo", 2)), Watermark.MAX))
+ verify(taskContext, times(2)).output(Message(Some(("bar", 2)), Watermark.MAX))
+ verify(taskContext).output(Message(Some(("bar", 1)), Watermark.MAX))
+ }
+
+}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 7db9b15..7651251 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -56,8 +56,8 @@
}
property("DataSourceTask should read from DataSource and transform inputs") {
- forAll(runnerGen, Gen.alphaStr) {
- (runner: Option[FunctionRunner[Any, Any]], str: String) =>
+ forAll(runnerGen, Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+ (runner: Option[FunctionRunner[Any, Any]], str: String, timestamp: Instant) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
@@ -65,7 +65,7 @@
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
val transform = new Transform[Any, Any](taskContext, runner)
val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
- val msg = Message(str)
+ val msg = Message(str, timestamp)
when(dataSource.read()).thenReturn(msg)
runner.foreach(r => {
when(r.process(str)).thenReturn(Some(str))
@@ -75,7 +75,7 @@
sourceTask.onNext(Message("next"))
sourceTask.onWatermarkProgress(Watermark.MAX)
- verify(taskContext).output(Message(str, Watermark.MAX))
+ verify(taskContext).output(msg)
}
}