[GEARPUMP-367] Don't use windowing unnecessarily
Author: manuzhang <owenzhang1990@gmail.com>
Closes #226 from manuzhang/simplify_dsl.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
index 6d43f16..2d3d94b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
@@ -36,7 +36,7 @@
* * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]]
* * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops.
*
- * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally
+ * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.StreamingOperator]], which internally
* runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with
* [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually
* executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]].
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index c37ced6..4a71b08 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -21,12 +21,11 @@
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FunctionRunner}
-import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => WindowRunnerAT}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FlatMapper, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{AndThenOperator, FlatMapOperator, StreamingOperator, WindowOperator}
import org.apache.gearpump.streaming.{Constants, Processor}
import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows}
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
import org.apache.gearpump.streaming.task.Task
@@ -69,6 +68,16 @@
}
}
+ def isFlatMapper(runner: FunctionRunner[Any, Any]): Boolean = {
+ runner match {
+ case fm: FlatMapper[Any, Any] =>
+ true
+ case at: AndThen[Any, Any, Any] =>
+ isFlatMapper(at.first) && isFlatMapper(at.second)
+ case _ =>
+ false
+ }
+ }
}
/**
@@ -134,39 +143,59 @@
dataSource: DataSource,
parallelism: Int = 1,
description: String = "source",
- userConfig: UserConfig = UserConfig.empty)
+ userConfig: UserConfig = UserConfig.empty,
+ operator: Option[StreamingOperator[Any, Any]] = None)
extends Op {
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
- case op: WindowTransformOp[_, _] =>
+ case op: WindowTransformOp[Any, Any] =>
+ val chainedRunner =
+ operator.map(AndThenOperator(_, op.operator)).getOrElse(op.operator)
DataSourceOp(
dataSource,
parallelism,
Op.concatenate(description, op.description),
Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
- op.windowRunner),
- op.userConfig))
- case op: TransformOp[_, _] =>
- chain(
- WindowOp(GlobalWindows()).chain(op))
+ chainedRunner),
+ op.userConfig),
+ Some(chainedRunner))
+ case op: TransformOp[Any, Any] =>
+ val runner = op.runner
+ if (Op.isFlatMapper(runner)) {
+ val fm = new FlatMapOperator[Any, Any](runner)
+ val chainedRunner =
+ operator.map(AndThenOperator(_, fm)).getOrElse(fm)
+ DataSourceOp(
+ dataSource,
+ parallelism,
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+ chainedRunner),
+ op.userConfig),
+ Some(chainedRunner)
+ )
+ } else {
+ chain(
+ WindowOp(GlobalWindows()).chain(op))
+ }
case op: WindowOp =>
chain(
op.chain(TransformOp(new DummyRunner[Any]())))
case op: TransformWindowTransformOp[_, _, _] =>
- chain(
- WindowOp(GlobalWindows()).chain(op.transformOp)
- .chain(op.windowTransformOp))
+ chain(op.transformOp).chain(op.windowTransformOp)
case _ =>
throw new OpChainException(this, other)
}
}
override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- Op.withGlobalWindowsDummyRunner(this, userConfig,
+ if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
+ chain(TransformOp(new DummyRunner[Any])).toProcessor
+ } else {
Processor[DataSourceTask[Any, Any]](parallelism, description,
userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
- )
+ }
}
}
@@ -195,10 +224,10 @@
* to another Op to be executed
*/
case class TransformOp[IN, OUT](
- fn: FunctionRunner[IN, OUT],
+ runner: FunctionRunner[IN, OUT],
userConfig: UserConfig = UserConfig.empty) extends Op {
- override def description: String = fn.description
+ override def description: String = runner.description
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
@@ -208,16 +237,16 @@
// => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3))
// => AndThen(AndThen(f1, f2), f3)
TransformOp(
- AndThen(fn, op.fn),
+ AndThen(runner, op.runner),
Op.concatenate(userConfig, op.userConfig))
case op: WindowOp =>
TransformWindowTransformOp(this,
- WindowTransformOp(new DefaultWindowRunner[OUT, OUT](
+ WindowTransformOp(new WindowOperator[OUT, OUT](
op.windows, new DummyRunner[OUT]
), op.description, op.userConfig))
case op: TransformWindowTransformOp[OUT, _, _] =>
TransformWindowTransformOp(TransformOp(
- AndThen(fn, op.transformOp.fn),
+ AndThen(runner, op.transformOp.runner),
Op.concatenate(userConfig, op.transformOp.userConfig)
), op.windowTransformOp)
case _ =>
@@ -244,13 +273,13 @@
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
case op: TransformOp[_, _] =>
- WindowTransformOp(new DefaultWindowRunner(windows, op.fn),
+ WindowTransformOp(new WindowOperator(windows, op.runner),
Op.concatenate(description, op.description),
Op.concatenate(userConfig, op.userConfig))
case op: WindowOp =>
chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any])))
case op: TransformWindowTransformOp[_, _, _] =>
- WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn),
+ WindowTransformOp(new WindowOperator(windows, op.transformOp.runner),
Op.concatenate(description, op.transformOp.description),
Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp)
case _ =>
@@ -290,7 +319,7 @@
Op.concatenate(description, op.description),
Op.concatenate(
userConfig
- .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner),
+ .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.operator),
userConfig))
case op: WindowOp =>
chain(op.chain(TransformOp(new DummyRunner[Any]())))
@@ -329,7 +358,7 @@
parallelism,
description,
Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
- op.windowRunner),
+ op.operator),
op.userConfig))
case op: WindowOp =>
chain(op.chain(TransformOp(new DummyRunner[Any]())))
@@ -352,7 +381,7 @@
* it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
*/
private case class WindowTransformOp[IN, OUT](
- windowRunner: WindowRunner[IN, OUT],
+ operator: StreamingOperator[IN, OUT],
description: String,
userConfig: UserConfig) extends Op {
@@ -360,7 +389,7 @@
other match {
case op: WindowTransformOp[OUT, _] =>
WindowTransformOp(
- WindowRunnerAT(windowRunner, op.windowRunner),
+ AndThenOperator(operator, op.operator),
Op.concatenate(description, op.description),
Op.concatenate(userConfig, op.userConfig)
)
@@ -372,7 +401,7 @@
override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
// TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
- Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+ Constants.GEARPUMP_STREAMING_OPERATOR, operator))
}
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index 2c11238..c638257 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -39,6 +39,7 @@
def description: String
}
+
case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE],
second: FunctionRunner[MIDDLE, OUT])
extends FunctionRunner[IN, OUT] {
@@ -114,10 +115,5 @@
}
}
-class DummyRunner[T] extends FunctionRunner[T, T] {
-
- override def process(value: T): TraversableOnce[T] = Option(value)
-
- override def description: String = ""
-}
+class DummyRunner[T] extends FlatMapper[T, T](FlatMapFunction(Option(_)), "")
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
index b3f3ad2..b615354 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
@@ -25,7 +25,7 @@
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator}
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
@@ -44,20 +44,21 @@
)
}
- private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
- new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+ private val groups: UnifiedMap[GROUP, StreamingOperator[IN, OUT]] =
+ new UnifiedMap[GROUP, StreamingOperator[IN, OUT]]
override def onNext(message: Message): Unit = {
val input = message.value.asInstanceOf[IN]
val group = groupBy(input)
if (!groups.containsKey(group)) {
- groups.put(group,
- userConfig.getValue[WindowRunner[IN, OUT]](
- GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+ val operator = userConfig.getValue[StreamingOperator[IN, OUT]](
+ GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get
+ operator.setup()
+ groups.put(group, operator)
}
- groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN],
+ groups.get(group).foreach(TimestampedValue(message.value.asInstanceOf[IN],
message.timestamp))
}
@@ -65,11 +66,19 @@
if (groups.isEmpty && watermark == Watermark.MAX) {
taskContext.updateWatermark(Watermark.MAX)
} else {
- groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
- override def accept(runner: WindowRunner[IN, OUT]): Unit = {
- TaskUtil.trigger(watermark, runner, taskContext)
+ groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] {
+ override def accept(operator: StreamingOperator[IN, OUT]): Unit = {
+ TaskUtil.trigger(watermark, operator, taskContext)
}
})
}
}
+
+ override def onStop(): Unit = {
+ groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] {
+ override def accept(operator: StreamingOperator[IN, OUT]): Unit = {
+ operator.teardown()
+ }
+ })
+ }
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 5ad64fa..6c78e0b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -22,25 +22,33 @@
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{StreamingOperator, TimestampedValue}
import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
class TransformTask[IN, OUT](
- runner: WindowRunner[IN, OUT],
+ operator: StreamingOperator[IN, OUT],
taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
def this(context: TaskContext, conf: UserConfig) = {
this(
- conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+ conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
context, conf
)
}
+ override def onStart(startTime: Instant): Unit = {
+ operator.setup()
+ }
+
override def onNext(msg: Message): Unit = {
- runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
+ operator.foreach(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
}
override def onWatermarkProgress(watermark: Instant): Unit = {
- TaskUtil.trigger(watermark, runner, taskContext)
+ TaskUtil.trigger(watermark, operator, taskContext)
+ }
+
+ override def onStop(): Unit = {
+ operator.teardown()
}
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
similarity index 65%
rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
index ee3c067..4f29c9e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
@@ -23,6 +23,7 @@
import com.gs.collections.api.block.procedure.Procedure
import com.gs.collections.impl.list.mutable.FastList
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.Message
import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
@@ -32,65 +33,127 @@
import scala.collection.mutable.ArrayBuffer
/**
- * Inputs for [[WindowRunner]].
+ * Inputs for [[StreamingOperator]].
*/
-case class TimestampedValue[T](value: T, timestamp: Instant)
+case class TimestampedValue[T](value: T, timestamp: Instant) {
+
+ def this(msg: Message) = {
+ this(msg.value.asInstanceOf[T], msg.timestamp)
+ }
+
+ def toMessage: Message = Message(value, timestamp)
+}
/**
- * Outputs triggered by [[WindowRunner]]
+ * Outputs triggered by [[StreamingOperator]]
*/
case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
watermark: Instant)
+
+trait StreamingOperator[IN, OUT] extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
+ def foreach(tv: TimestampedValue[IN]): Unit
+
+ def flatMap(
+ tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+ foreach(tv)
+ None
+ }
+
+ def trigger(time: Instant): TriggeredOutputs[OUT]
+
+ def teardown(): Unit = {}
+}
+
+/**
+ * A composite WindowRunner that first executes its left child and feeds results
+ * into result child.
+ */
+case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE],
+ right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] {
+
+ override def setup(): Unit = {
+ left.setup()
+ right.setup()
+ }
+
+ override def foreach(
+ tv: TimestampedValue[IN]): Unit = {
+ left.flatMap(tv).foreach(right.flatMap)
+ }
+
+ override def flatMap(
+ tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+ left.flatMap(tv).flatMap(right.flatMap)
+ }
+
+ override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+ val lOutputs = left.trigger(time)
+ lOutputs.outputs.foreach(right.foreach)
+ right.trigger(lOutputs.watermark)
+ }
+
+ override def teardown(): Unit = {
+ left.teardown()
+ right.teardown()
+ }
+}
+
+/**
+ * @param runner FlatMapper or chained FlatMappers
+ */
+class FlatMapOperator[IN, OUT](runner: FunctionRunner[IN, OUT])
+ extends StreamingOperator[IN, OUT] {
+
+ override def setup(): Unit = {
+ runner.setup()
+ }
+
+ override def foreach(tv: TimestampedValue[IN]): Unit = {
+ throw new UnsupportedOperationException("foreach should not be invoked on FlatMapOperator; " +
+ "please use flatMap instead")
+ }
+
+ override def flatMap(
+ tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+ runner.process(tv.value)
+ .map(TimestampedValue(_, tv.timestamp))
+ }
+
+ override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+ TriggeredOutputs(None, time)
+ }
+
+ override def teardown(): Unit = {
+ runner.teardown()
+ }
+}
+
/**
* This is responsible for executing window calculation.
* 1. Groups elements into windows as defined by window function
* 2. Applies window calculation to each group
* 3. Emits results on triggering
*/
-trait WindowRunner[IN, OUT] extends java.io.Serializable {
-
- def process(timestampedValue: TimestampedValue[IN]): Unit
-
- def trigger(time: Instant): TriggeredOutputs[OUT]
-}
-
-/**
- * A composite WindowRunner that first executes its left child and feeds results
- * into result child.
- */
-case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
- right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
-
- override def process(timestampedValue: TimestampedValue[IN]): Unit = {
- left.process(timestampedValue)
- }
-
- override def trigger(time: Instant): TriggeredOutputs[OUT] = {
- val lOutputs = left.trigger(time)
- lOutputs.outputs.foreach(right.process)
- right.trigger(lOutputs.watermark)
- }
-}
-
-/**
- * Default implementation for [[WindowRunner]].
- */
-class DefaultWindowRunner[IN, OUT](
+class WindowOperator[IN, OUT](
windows: Windows,
- fnRunner: FunctionRunner[IN, OUT])
- extends WindowRunner[IN, OUT] {
+ runner: FunctionRunner[IN, OUT])
+ extends StreamingOperator[IN, OUT] {
private val windowFn = windows.windowFn
private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]]
- private var setup = false
+ private var isSetup = false
private var watermark = Watermark.MIN
- override def process(timestampedValue: TimestampedValue[IN]): Unit = {
+ override def foreach(
+ tv: TimestampedValue[IN]): Unit = {
val wins = windowFn(new Context[IN] {
- override def element: IN = timestampedValue.value
+ override def element: IN = tv.value
- override def timestamp: Instant = timestampedValue.timestamp
+ override def timestamp: Instant = tv.timestamp
})
wins.foreach { win =>
if (windowFn.isNonMerging) {
@@ -98,9 +161,9 @@
val inputs = new FastList[TimestampedValue[IN]]
windowInputs.put(win, inputs)
}
- windowInputs.get(win).add(timestampedValue)
+ windowInputs.get(win).add(tv)
} else {
- merge(windowInputs, win, timestampedValue)
+ merge(windowInputs, win, tv)
}
}
@@ -133,26 +196,26 @@
val firstWin = windowInputs.firstKey
if (!time.isBefore(firstWin.endTime)) {
val inputs = windowInputs.remove(firstWin)
- if (!setup) {
- fnRunner.setup()
- setup = true
+ if (!isSetup) {
+ runner.setup()
+ isSetup = true
}
inputs.forEach(new Procedure[TimestampedValue[IN]] {
override def value(tv: TimestampedValue[IN]): Unit = {
- fnRunner.process(tv.value).foreach {
+ runner.process(tv.value).foreach {
out: OUT => outputs += TimestampedValue(out, tv.timestamp)
}
}
})
- fnRunner.finish().foreach {
+ runner.finish().foreach {
out: OUT =>
outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
}
val newWmk = TaskUtil.max(wmk, firstWin.endTime)
if (windows.accumulationMode == Discarding) {
- fnRunner.teardown()
+ runner.teardown()
// discarding, setup need to be called for each window
- setup = false
+ isSetup = false
}
onTrigger(outputs, newWmk)
} else {
@@ -176,4 +239,8 @@
watermark = TaskUtil.max(watermark, triggeredOutputs.watermark)
TriggeredOutputs(triggeredOutputs.outputs, watermark)
}
+
+ override def teardown(): Unit = {
+ runner.teardown()
+ }
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
index dd4c0d3..c471a00 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
@@ -18,11 +18,11 @@
package org.apache.gearpump.streaming.source
+
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
-import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows}
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, Window, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{FlatMapOperator, StreamingOperator}
import org.apache.gearpump.streaming.{Constants, Processor}
/**
@@ -48,19 +48,9 @@
Processor[DataSourceTask[Any, Any]](parallelism, description,
taskConf
.withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource)
- .withValue[WindowRunner[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
- new DefaultWindowRunner[Any, Any](
- Windows(PerElementWindowFunction, description = "perElementWindows"),
- new DummyRunner[Any])))
- }
-
-
- case object PerElementWindowFunction extends WindowFunction {
- override def apply[T](
- context: WindowFunction.Context[T]): Array[Window] = {
- Array(Window(context.timestamp, context.timestamp.plusMillis(1)))
- }
-
- override def isNonMerging: Boolean = true
+ .withValue[StreamingOperator[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
+ new FlatMapOperator(new DummyRunner[Any])
+ )
+ )
}
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index f93c496..b09ad66 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,7 +23,7 @@
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator}
import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
/**
@@ -40,7 +40,7 @@
*/
class DataSourceTask[IN, OUT] private[source](
source: DataSource,
- windowRunner: WindowRunner[IN, OUT],
+ operator: StreamingOperator[IN, OUT],
context: TaskContext,
conf: UserConfig)
extends Task(context, conf) {
@@ -48,7 +48,7 @@
def this(context: TaskContext, conf: UserConfig) = {
this(
conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
- conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+ conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
context, conf
)
}
@@ -58,27 +58,32 @@
override def onStart(startTime: Instant): Unit = {
LOG.info(s"opening data source at ${startTime.toEpochMilli}")
source.open(context, startTime)
+ operator.setup()
self ! Watermark(source.getWatermark)
}
override def onNext(m: Message): Unit = {
0.until(batchSize).foreach { _ =>
- Option(source.read()).foreach(
- msg => windowRunner.process(
- TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)))
+ Option(source.read()).foreach(process)
}
self ! Watermark(source.getWatermark)
}
override def onWatermarkProgress(watermark: Instant): Unit = {
- TaskUtil.trigger(watermark, windowRunner, context)
+ TaskUtil.trigger(watermark, operator, context)
}
override def onStop(): Unit = {
LOG.info("closing data source...")
source.close()
+ operator.teardown()
+ }
+
+ private def process(msg: Message): Unit = {
+ operator.flatMap(new TimestampedValue(msg))
+ .foreach { tv => context.output(tv.toMessage) }
}
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
index bd889c4..ed304ce 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
@@ -21,7 +21,7 @@
import java.time.Instant
import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator}
object TaskUtil {
@@ -36,7 +36,7 @@
loader.loadClass(className).asSubclass(classOf[Task])
}
- def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT],
+ def trigger[IN, OUT](watermark: Instant, runner: StreamingOperator[IN, OUT],
context: TaskContext): Unit = {
val triggeredOutputs = runner.trigger(watermark)
context.updateWatermark(triggeredOutputs.watermark)
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index ca0135d..79ef135 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -28,7 +28,7 @@
import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator}
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -66,7 +66,7 @@
val dataSourceOp = DataSourceOp(dataSource)
val transformOp = mock[TransformOp[Any, Any]]
val fn = mock[FunctionRunner[Any, Any]]
- when(transformOp.fn).thenReturn(fn)
+ when(transformOp.runner).thenReturn(fn)
val chainedOp = dataSourceOp.chain(transformOp)
@@ -85,7 +85,7 @@
val processor = dataSourceOp.toProcessor
processor shouldBe a[Processor[_]]
processor.parallelism shouldBe dataSourceOp.parallelism
- processor.description shouldBe s"${dataSourceOp.description}.globalWindows"
+ processor.description shouldBe s"${dataSourceOp.description}"
}
}
@@ -173,9 +173,9 @@
"chain WindowTransformOp" in {
- val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner())
+ val runner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner())
val windowTransformOp = mock[WindowTransformOp[Any, Any]]
- when(windowTransformOp.windowRunner).thenReturn(runner)
+ when(windowTransformOp.operator).thenReturn(runner)
val chainedOp = groupByOp.chain(windowTransformOp)
chainedOp shouldBe a[GroupByOp[_, _]]
@@ -199,9 +199,9 @@
val mergeOp = MergeOp()
"chain WindowTransformOp" in {
- val runner = mock[WindowRunner[Any, Any]]
+ val runner = mock[StreamingOperator[Any, Any]]
val windowTransformOp = mock[WindowTransformOp[Any, Any]]
- when(windowTransformOp.windowRunner).thenReturn(runner)
+ when(windowTransformOp.operator).thenReturn(runner)
val chainedOp = mergeOp.chain(windowTransformOp)
chainedOp shouldBe a [MergeOp]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index be4cc63..f2c9d0e 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -87,8 +87,8 @@
.mapVertex(_.description)
plan.getVertices.toSet should contain theSameElementsAs
- Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
- plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
+ Set("source", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
+ plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe
a[GroupByPartitioner[_, _]]
plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe
a[CoLocationPartitioner]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index 6244224..c92f9c8 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -29,7 +29,7 @@
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.task.TransformTask
import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator}
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{Matchers, WordSpec}
@@ -218,11 +218,11 @@
val data = "one two three".split("\\s+")
val dataSource = new CollectionDataSource[String](data)
- val runner1 = new DefaultWindowRunner[String, String](
+ val runner1 = new WindowOperator[String, String](
GlobalWindows(), new DummyRunner[String])
val conf = UserConfig.empty
.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
- .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1)
+ .withValue[StreamingOperator[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1)
// Source with no transformer
val source = new DataSourceTask[String, String](
@@ -239,7 +239,7 @@
val anotherTaskContext = MockUtil.mockTaskContext
val double = new FlatMapper[String, String](FlatMapFunction(
word => List(word, word)), "double")
- val runner2 = new DefaultWindowRunner[String, String](
+ val runner2 = new WindowOperator[String, String](
GlobalWindows(), double)
val another = new DataSourceTask(anotherTaskContext,
conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2))
@@ -262,7 +262,7 @@
val conf = UserConfig.empty
val double = new FlatMapper[String, String](FlatMapFunction(
word => List(word, word)), "double")
- val transform = new DefaultWindowRunner[String, String](GlobalWindows(), double)
+ val transform = new WindowOperator[String, String](GlobalWindows(), double)
val task = new TransformTask[String, String](transform, taskContext, conf)
task.onStart(Instant.EPOCH)
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index d43bca0..f701c5e 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -61,9 +61,9 @@
dag.getVertices.size shouldBe 2
dag.getVertices.foreach { processor =>
processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
- if (processor.description == "A.globalWindows") {
+ if (processor.description == "A") {
processor.parallelism shouldBe 2
- } else if (processor.description == "B.globalWindows") {
+ } else if (processor.description == "B") {
processor.parallelism shouldBe 3
} else {
fail(s"undefined source ${processor.description}")
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
index 9e6bf59..62e14f4 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
@@ -24,7 +24,7 @@
import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
import org.apache.gearpump.streaming.{Constants, MockUtil}
-import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner
+import org.apache.gearpump.streaming.dsl.window.impl.WindowOperator
import org.apache.gearpump.streaming.source.Watermark
import org.mockito.Mockito._
import org.scalacheck.Gen
@@ -40,7 +40,7 @@
forAll(longGen) { (time: Instant) =>
val groupBy = mock[Any => Int]
- val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner[Any])
+ val windowRunner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner[Any])
val context = MockUtil.mockTaskContext
val config = UserConfig.empty
.withValue(
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index e38c5a3..0009ad5 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,7 +22,7 @@
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator}
import org.mockito.Mockito.{verify, when}
import org.scalacheck.Gen
import org.scalatest.{Matchers, PropSpec}
@@ -36,7 +36,7 @@
val watermarkGen = longGen.map(Instant.ofEpochMilli)
forAll(watermarkGen) { (watermark: Instant) =>
- val windowRunner = mock[WindowRunner[Any, Any]]
+ val windowRunner = mock[StreamingOperator[Any, Any]]
val context = MockUtil.mockTaskContext
val config = UserConfig.empty
val task = new TransformTask[Any, Any](windowRunner, context, config)
@@ -45,7 +45,7 @@
val message = Message(value, time)
task.onNext(message)
- verify(windowRunner).process(TimestampedValue(value, time))
+ verify(windowRunner).foreach(TimestampedValue(value, time))
when(windowRunner.trigger(watermark)).thenReturn(
TriggeredOutputs(Some(TimestampedValue(value, time)), watermark))
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
index b23d0ee..1ac7213 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -46,10 +46,10 @@
implicit val system = MockUtil.system
val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
val windows = SessionWindows.apply(Duration.ofMillis(4L))
- val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows,
+ val windowRunner = new WindowOperator[KV, Option[KV]](windows,
new FoldRunner[KV, Option[KV]](reduce, "reduce"))
- data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
+ data.foreach(m => windowRunner.foreach(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
windowRunner.trigger(Watermark.MAX).outputs.toList shouldBe
List(
TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)),
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index d62739a..cd2cfa7 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,7 +23,7 @@
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator}
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
@@ -40,7 +40,7 @@
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val runner = mock[WindowRunner[Any, Any]]
+ val runner = mock[StreamingOperator[Any, Any]]
val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
sourceTask.onStart(startTime)
@@ -57,13 +57,17 @@
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val runner = mock[WindowRunner[Any, Any]]
- val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
+ val processor = mock[StreamingOperator[String, String]]
+ val sourceTask = new DataSourceTask[String, String](dataSource, processor,
+ taskContext, config)
val msg = Message(str, timestamp)
when(dataSource.read()).thenReturn(msg)
- when(runner.trigger(Watermark.MAX)).thenReturn(
- TriggeredOutputs(Some(TimestampedValue(str.asInstanceOf[Any], timestamp)), Watermark.MAX))
+ when(processor.flatMap(new TimestampedValue[String](msg))).thenReturn(
+ Some(new TimestampedValue[String](msg))
+ )
+ when(processor.trigger(Watermark.MAX)).thenReturn(
+ TriggeredOutputs[String](None, Watermark.MAX))
sourceTask.onNext(Message("next"))
sourceTask.onWatermarkProgress(Watermark.MAX)
@@ -79,7 +83,7 @@
val dataSource = mock[DataSource]
val config = UserConfig.empty
.withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
- val runner = mock[WindowRunner[Any, Any]]
+ val runner = mock[StreamingOperator[Any, Any]]
val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
sourceTask.onStop()