[GEARPUMP-261] Translate ChainableOp to Processor of TransformTask
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [x] Make sure the commit message is formatted like:
`[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
- [x] Make sure tests pass via `sbt clean test`.
- [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.
Author: manuzhang <owenzhang1990@gmail.com>
Closes #130 from manuzhang/chainable_op.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 744976b..f15d875 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -141,7 +141,8 @@
}
override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+ Processor[TransformTask[Any, Any]](1, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn))
}
}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index bf52abc..98bf24f 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -155,12 +155,17 @@
}
}
- "throw exception on getProcessor" in {
- val fn1 = mock[SingleInputFunction[Any, Any]]
- val chainableOp1 = ChainableOp[Any, Any](fn1)
- intercept[UnsupportedOperationException] {
- chainableOp1.getProcessor
+ "get Processor" in {
+ val fn = new SingleInputFunction[Any, Any] {
+ override def process(value: Any): TraversableOnce[Any] = null
+
+ override def description: String = null
}
+ val chainableOp = ChainableOp[Any, Any](fn)
+
+ val processor = chainableOp.getProcessor
+ processor shouldBe a[Processor[_]]
+ processor.parallelism shouldBe 1
}
}