[GEARPUMP-307] Fix TransformTask$Transform invocation
Author: manuzhang <owenzhang1990@gmail.com>
Closes #182 from manuzhang/fix_transform.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index ed48dc7..86ac933 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -32,25 +32,18 @@
operator: Option[FunctionRunner[IN, OUT]],
private var buffer: Vector[Message] = Vector.empty[Message]) {
- def onStart(startTime: Instant): Unit = {
- operator.foreach(_.setup())
- }
-
def onNext(msg: Message): Unit = {
buffer +:= msg
}
- def onStop(): Unit = {
- operator.foreach(_.teardown())
- }
-
def onWatermarkProgress(watermark: Instant): Unit = {
val watermarkTime = watermark.toEpochMilli
var nextBuffer = Vector.empty[Message]
val processor = operator.map(FunctionRunner.withEmitFn(_,
(out: OUT) => taskContext.output(Message(out, watermarkTime))))
+ processor.foreach(_.setup())
buffer.foreach { case message@Message(in, time) =>
- if (time <= watermarkTime) {
+ if (time < watermarkTime) {
processor match {
case Some(p) =>
// .toList forces eager evaluation
@@ -63,7 +56,8 @@
}
}
// .toList forces eager evaluation
- processor.map(_.finish())
+ processor.map(_.finish().toList)
+ processor.foreach(_.teardown())
buffer = nextBuffer
}
}
@@ -78,18 +72,10 @@
GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf)
}
- override def onStart(startTime: Instant): Unit = {
- transform.onStart(startTime)
- }
-
override def onNext(msg: Message): Unit = {
transform.onNext(msg)
}
- override def onStop(): Unit = {
- transform.onStop()
- }
-
override def onWatermarkProgress(watermark: Instant): Unit = {
transform.onWatermarkProgress(watermark)
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 3fceb1a..ff1b2d4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -59,7 +59,6 @@
override def onStart(startTime: Instant): Unit = {
LOG.info(s"opening data source at $startTime")
source.open(context, startTime)
- transform.onStart(startTime)
self ! Watermark(source.getWatermark)
}
@@ -78,7 +77,6 @@
override def onStop(): Unit = {
LOG.info("closing data source...")
- transform.onStop()
source.close()
}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 67fa375..f0bccd7 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -25,7 +25,8 @@
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
import org.apache.gearpump.streaming.source.Watermark
-import org.mockito.Mockito.{verify, when}
+import org.mockito.{Matchers => MockitoMatchers}
+import org.mockito.Mockito.{times, verify, when}
import org.scalacheck.Gen
import org.scalatest.{Matchers, PropSpec}
import org.scalatest.mock.MockitoSugar
@@ -33,50 +34,44 @@
class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- property("TransformTask should setup functions") {
- forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
- val taskContext = MockUtil.mockTaskContext
- implicit val system = MockUtil.system
- val config = UserConfig.empty
- val operator = mock[FunctionRunner[Any, Any]]
- val transform = new Transform[Any, Any](taskContext, Some(operator))
- val sourceTask = new TransformTask[Any, Any](transform, taskContext, config)
+ private val timeGen = Gen.chooseNum[Long](Watermark.MIN.toEpochMilli,
+ Watermark.MAX.toEpochMilli - 1).map(Instant.ofEpochMilli)
+ private val runnerGen = {
+ val runner = mock[FunctionRunner[Any, Any]]
+ Gen.oneOf(Some(runner), None)
+ }
- sourceTask.onStart(startTime)
+ property("TransformTask should emit on watermark") {
+ val msgGen = for {
+ str <- Gen.alphaStr.suchThat(!_.isEmpty)
+ t <- timeGen
+ } yield Message(s"$str:$t", t)
+ val msgsGen = Gen.listOfN(10, msgGen)
- verify(operator).setup()
+ forAll(runnerGen, msgsGen) {
+ (runner: Option[FunctionRunner[Any, Any]], msgs: List[Message]) =>
+ val taskContext = MockUtil.mockTaskContext
+ implicit val system = MockUtil.system
+ val config = UserConfig.empty
+ val transform = new Transform[Any, Any](taskContext, runner)
+ val task = new TransformTask[Any, Any](transform, taskContext, config)
+
+ msgs.foreach(task.onNext)
+
+ runner.foreach(r => when(r.finish()).thenReturn(None))
+ task.onWatermarkProgress(Watermark.MIN)
+ verify(taskContext, times(0)).output(MockitoMatchers.any[Message])
+
+ msgs.foreach { msg =>
+ runner.foreach(r =>
+ when(r.process(msg.msg)).thenReturn(Some(msg.msg)))
+ }
+ task.onWatermarkProgress(Watermark.MAX)
+
+ msgs.foreach { msg =>
+ verify(taskContext).output(MockitoMatchers.eq(Message(msg.msg, Watermark.MAX)))
+ }
}
}
- property("TransformTask should process inputs") {
- forAll(Gen.alphaStr) { (str: String) =>
- val taskContext = MockUtil.mockTaskContext
- implicit val system = MockUtil.system
- val config = UserConfig.empty
- val operator = mock[FunctionRunner[Any, Any]]
- val transform = new Transform[Any, Any](taskContext, Some(operator))
- val task = new TransformTask[Any, Any](transform, taskContext, config)
- val msg = Message(str)
- when(operator.process(str)).thenReturn(Some(str))
- when(operator.finish()).thenReturn(None)
-
- task.onNext(msg)
- task.onWatermarkProgress(Watermark.MAX)
-
- verify(taskContext).output(Message(str, Watermark.MAX))
- }
- }
-
- property("TransformTask should teardown functions") {
- val taskContext = MockUtil.mockTaskContext
- implicit val system = MockUtil.system
- val config = UserConfig.empty
- val operator = mock[FunctionRunner[Any, Any]]
- val transform = new Transform[Any, Any](taskContext, Some(operator))
- val task = new TransformTask[Any, Any](transform, taskContext, config)
-
- task.onStop()
-
- verify(operator).teardown()
- }
}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 3c44c4c..7db9b15 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -33,55 +33,65 @@
class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
- property("DataSourceTask should setup data source and Transform") {
- forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) =>
+ private val runnerGen = {
+ val runner = mock[FunctionRunner[Any, Any]]
+ Gen.oneOf(Some(runner), None)
+ }
+
+ property("DataSourceTask should setup data source") {
+ forAll(runnerGen, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+ (runner: Option[FunctionRunner[Any, Any]], startTime: Instant) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val operator = mock[FunctionRunner[Any, Any]]
- val transform = new Transform[Any, Any](taskContext, Some(operator))
+ val transform = new Transform[Any, Any](taskContext, runner)
val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
sourceTask.onStart(startTime)
verify(dataSource).open(taskContext, startTime)
- verify(operator).setup()
}
}
property("DataSourceTask should read from DataSource and transform inputs") {
- forAll(Gen.alphaStr) { (str: String) =>
+ forAll(runnerGen, Gen.alphaStr) {
+ (runner: Option[FunctionRunner[Any, Any]], str: String) =>
+ val taskContext = MockUtil.mockTaskContext
+ implicit val system = MockUtil.system
+ val dataSource = mock[DataSource]
+ val config = UserConfig.empty
+ .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+ val transform = new Transform[Any, Any](taskContext, runner)
+ val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+ val msg = Message(str)
+ when(dataSource.read()).thenReturn(msg)
+ runner.foreach(r => {
+ when(r.process(str)).thenReturn(Some(str))
+ when(r.finish()).thenReturn(None)
+ })
+
+ sourceTask.onNext(Message("next"))
+ sourceTask.onWatermarkProgress(Watermark.MAX)
+
+ verify(taskContext).output(Message(str, Watermark.MAX))
+ }
+ }
+
+ property("DataSourceTask should teardown DataSource") {
+ forAll(runnerGen) { (runner: Option[FunctionRunner[Any, Any]]) =>
val taskContext = MockUtil.mockTaskContext
implicit val system = MockUtil.system
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val transform = new Transform[Any, Any](taskContext, None)
+ val transform = new Transform[Any, Any](taskContext, runner)
val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
- val msg = Message(str)
- when(dataSource.read()).thenReturn(msg)
- sourceTask.onNext(Message("next"))
- sourceTask.onWatermarkProgress(Watermark.MAX)
- verify(taskContext).output(Message(str, Watermark.MAX))
+ sourceTask.onStop()
+
+ verify(dataSource).close()
}
}
-
- property("DataSourceTask should teardown DataSource and Transform") {
- val taskContext = MockUtil.mockTaskContext
- implicit val system = MockUtil.system
- val dataSource = mock[DataSource]
- val config = UserConfig.empty
- .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val operator = mock[FunctionRunner[Any, Any]]
- val transform = new Transform[Any, Any](taskContext, Some(operator))
- val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
-
- sourceTask.onStop()
-
- verify(dataSource).close()
- verify(operator).teardown()
- }
}