[GEARPUMP-311] refactor state management
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala
new file mode 100644
index 0000000..0cb1185
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.javaapi
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction}
+import org.apache.gearpump.streaming.refactor.dsl.javaapi.functions.{FlatMapWithStateFunction => JFlatMapWithStateFunction}
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.Stream
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api.Windows
+import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Java DSL
+ */
+class JavaStream[T](val stream: Stream[T]) {
+
+ /** FlatMap on stream */
+ def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
+ }
+
+ def flatMapWithState[R](fn: JFlatMapWithStateFunction[T, R],
+ description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMapWithState(FlatMapWithStateFunction(fn), "flatMapWithState"))
+ }
+
+ /** Map on stream */
+ def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
+ }
+
+ def mapWithState[R](fn: MapWithStateFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMapWithState(FlatMapWithStateFunction(fn), description))
+ }
+
+ /** Only keep the messages that FilterFunction returns true. */
+ def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
+ new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
+ }
+
+ def fold[A](fn: FoldFunction[T, A], description: String): JavaStream[A] = {
+ new JavaStream[A](stream.fold(fn, description))
+ }
+
+ /** Does aggregation on the stream */
+ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
+ new JavaStream[T](stream.reduce(fn, description))
+ }
+
+ def log(): Unit = {
+ stream.log()
+ }
+
+ /** Merges streams of same type together */
+ def merge(other: JavaStream[T], parallelism: Int, description: String): JavaStream[T] = {
+ new JavaStream[T](stream.merge(other.stream, parallelism, description))
+ }
+
+ /**
+ * Group by a stream and turns it to a list of sub-streams. Operations chained after
+ * groupBy applies to sub-streams.
+ */
+ def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
+ parallelism: Int, description: String): JavaStream[T] = {
+ new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
+ }
+
+ def window(win: Windows): JavaStream[T] = {
+ new JavaStream[T](stream.window(win))
+ }
+
+ /** Add a low level Processor to process messages */
+ def process[R](
+ processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
+ : JavaStream[R] = {
+ new JavaStream[R](stream.process(processor, parallelism, conf, description))
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala
new file mode 100644
index 0000000..9744ec6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.plan.functions
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.refactor.dsl.window.impl.{AndThen => WindowRunnerAT}
+import org.apache.gearpump.streaming.{Constants, Processor}
+import org.apache.gearpump.streaming.refactor.dsl.task.{GroupByTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows}
+import org.apache.gearpump.streaming.refactor.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.refactor.source.DataSourceTask
+import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+
+import scala.reflect.ClassTag
+
+object Op {
+
+ def concatenate(desc1: String, desc2: String): String = {
+ if (desc1 == null || desc1.isEmpty) desc2
+ else if (desc2 == null || desc2.isEmpty) desc1
+ else desc1 + "." + desc2
+ }
+
+ def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
+ config1.withConfig(config2)
+ }
+
+ def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
+ processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
+ if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
+ op.chain(
+ WindowOp(GlobalWindows()).chain(TransformOp(new DummyRunner[Any]))
+ ).toProcessor
+ } else {
+ processor
+ }
+ }
+
+}
+
+/**
+ * This is a vertex on the logical plan.
+ */
+sealed trait Op {
+
+ def description: String
+
+ def userConfig: UserConfig
+
+ def chain(op: Op)(implicit system: ActorSystem): Op
+
+ def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+/**
+ * This represents a low level Processor.
+ */
+case class ProcessorOp[T <: Task](
+ processor: Class[T],
+ parallelism: Int,
+ userConfig: UserConfig,
+ description: String)
+ extends Op {
+
+ def this(
+ parallelism: Int = 1,
+ userConfig: UserConfig = UserConfig.empty,
+ description: String = "processor")(implicit classTag: ClassTag[T]) = {
+ this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description)
+ }
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ throw new OpChainException(this, other)
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ DefaultProcessor(parallelism, description, userConfig, processor)
+ }
+}
+
+/**
+ * This represents a DataSource.
+ */
+case class DataSourceOp(
+ dataSource: DataSource,
+ parallelism: Int = 1,
+ description: String = "source",
+ userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: WindowTransformOp[_, _] =>
+ DataSourceOp(
+ dataSource,
+ parallelism,
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+ op.windowRunner),
+ op.userConfig))
+ case op: TransformOp[_, _] =>
+ chain(
+ WindowOp(GlobalWindows()).chain(op))
+ case op: WindowOp =>
+ chain(
+ op.chain(TransformOp(new DummyRunner[Any]())))
+ case op: TransformWindowTransformOp[_, _, _] =>
+ chain(
+ WindowOp(GlobalWindows()).chain(op.transformOp)
+ .chain(op.windowTransformOp))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Op.withGlobalWindowsDummyRunner(this, userConfig,
+ Processor[DataSourceTask[Any, Any]](parallelism, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
+ )
+ }
+}
+
+/**
+ * This represents a DataSink.
+ */
+case class DataSinkOp(
+ dataSink: DataSink,
+ parallelism: Int = 1,
+ description: String = "sink",
+ userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(op: Op)(implicit system: ActorSystem): Op = {
+ throw new OpChainException(this, op)
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ DataSinkProcessor(dataSink, parallelism, description)
+ }
+}
+
+/**
+ * This represents operations that can be chained together
+ * (e.g. flatMap, map, filter, reduce) and further chained
+ * to another Op to be used
+ */
+case class TransformOp[IN, OUT](
+ fn: FunctionRunner[IN, OUT],
+ userConfig: UserConfig = UserConfig.empty) extends Op {
+
+ override def description: String = fn.description
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: TransformOp[OUT, _] =>
+ // TODO: preserve type info
+ // f3(f2(f1(in)))
+ // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3))
+ // => AndThen(AndThen(f1, f2), f3)
+ TransformOp(
+ AndThen(fn, op.fn),
+ Op.concatenate(userConfig, op.userConfig))
+ case op: WindowOp =>
+ TransformWindowTransformOp(this,
+ WindowTransformOp(new DefaultWindowRunner[OUT, OUT](
+ op.windows, new DummyRunner[OUT]
+ ), op.description, op.userConfig))
+ case op: TransformWindowTransformOp[OUT, _, _] =>
+ TransformWindowTransformOp(TransformOp(
+ AndThen(fn, op.transformOp.fn),
+ Op.concatenate(userConfig, op.transformOp.userConfig)
+ ), op.windowTransformOp)
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ WindowOp(GlobalWindows()).chain(this).toProcessor
+ }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
+ * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
+ * Otherwise, it will be translated to a Processor of TransformTask.
+ */
+case class WindowTransformOp[IN, OUT](
+ windowRunner: WindowRunner[IN, OUT],
+ description: String,
+ userConfig: UserConfig) extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: WindowTransformOp[OUT, _] =>
+ WindowTransformOp(
+ WindowRunnerAT(windowRunner, op.windowRunner),
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig, op.userConfig)
+ )
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+ Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+ Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+ }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
+ * It will later be chained to a WindowOp, which results in two WindowTransformOps.
+ * Finally, they will be chained to a single WindowTransformOp.
+ */
+case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+ transformOp: TransformOp[IN, MIDDLE],
+ windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+ override def description: String = {
+ throw new UnsupportedOperationException(s"description is not supported on $this")
+ }
+
+ override def userConfig: UserConfig = {
+ throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+ }
+
+ override def chain(op: Op)(implicit system: ActorSystem): Op = {
+ throw new UnsupportedOperationException(s"chain is not supported on $this")
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ WindowOp(GlobalWindows()).chain(this).toProcessor
+ }
+}
+
+/**
+ * This represents a window aggregation, together with a following TransformOp
+ */
+case class WindowOp(
+ windows: Windows,
+ userConfig: UserConfig = UserConfig.empty) extends Op {
+
+ override def description: String = windows.description
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: TransformOp[_, _] =>
+ WindowTransformOp(new DefaultWindowRunner(windows, op.fn),
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig, op.userConfig))
+ case op: WindowOp =>
+ chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any])))
+ case op: TransformWindowTransformOp[_, _, _] =>
+ WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn),
+ Op.concatenate(description, op.transformOp.description),
+ Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp)
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ chain(TransformOp(new DummyRunner[Any])).toProcessor
+ }
+
+}
+
+/**
+ * This represents a Processor with groupBy and window aggregation
+ */
+case class GroupByOp[IN, GROUP] private(
+ groupBy: IN => GROUP,
+ parallelism: Int = 1,
+ description: String = "groupBy",
+ override val userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: WindowTransformOp[_, _] =>
+ GroupByOp(
+ groupBy,
+ parallelism,
+ Op.concatenate(description, op.description),
+ Op.concatenate(
+ userConfig
+ .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner),
+ userConfig))
+ case op: WindowOp =>
+ chain(op.chain(TransformOp(new DummyRunner[Any]())))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Op.withGlobalWindowsDummyRunner(this, userConfig,
+ Processor[GroupByTask[IN, GROUP, Any]](parallelism, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupBy)))
+ }
+}
+
+/**
+ * This represents a Processor transforming merged streams
+ */
+case class MergeOp(
+ parallelism: Int = 1,
+ description: String = "merge",
+ userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: WindowTransformOp[_, _] =>
+ MergeOp(
+ parallelism,
+ description,
+ Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+ op.windowRunner),
+ op.userConfig))
+ case op: WindowOp =>
+ chain(op.chain(TransformOp(new DummyRunner[Any]())))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Op.withGlobalWindowsDummyRunner(this, userConfig,
+ Processor[TransformTask[Any, Any]](parallelism, description, userConfig))
+ }
+
+}
+
+/**
+ * This is an edge on the logical plan.
+ */
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ * e.g. TransformOp
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ * e.g. GroupByOp
+ */
+case object Shuffle extends OpEdge
+
+/**
+ * Runtime exception thrown on chaining.
+ */
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 can't be chained by $op2")
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala
new file mode 100644
index 0000000..c0499c9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.scalaapi
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.refactor.dsl.plan.functions._
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+ private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+ private val edge: Option[OpEdge] = None,
+ private val windows: Windows = GlobalWindows()) {
+
+ /**
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
+ *
+ * @param fn flatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
+ *
+ * @param fn flatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+ transform(new FlatMapper[T, R](fn, description))
+ }
+
+ def flatMapWithState[R](fn: FlatMapWithStateFunction[T, R], description: String): Stream[R] = {
+ transform(new FlatMapper[T, R](fn, description))
+ }
+
+ /**
+ * Returns a new stream by applying a map function to each element.
+ *
+ * @param fn map function
+ * @return A new stream with type [R]
+ */
+ def map[R](fn: T => R, description: String = "map"): Stream[R] = {
+ this.map(MapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a map function to each element.
+ *
+ * @param fn map function
+ * @return A new stream with type [R]
+ */
+ def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ def mapWithState[R](fn: MapWithStateFunction[T, R], description: String): Stream[R] = {
+ this.flatMapWithState(FlatMapWithStateFunction(fn), description);
+ }
+
+ /**
+ * Returns a new Stream keeping the elements that satisfy the filter function.
+ *
+ * @param fn filter function
+ * @return a new stream after filter
+ */
+ def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
+ this.filter(FilterFunction(fn), description)
+ }
+
+ /**
+ * Returns a new Stream keeping the elements that satisfy the filter function.
+ *
+ * @param fn filter function
+ * @return a new stream after filter
+ */
+ def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a fold function over all the elements
+ *
+ * @param fn fold function
+ * @return a new stream after fold
+ */
+ def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
+ transform(new FoldRunner(fn, description))
+ }
+
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
+ * @param description description message for this operator
+ * @return a new stream after reduce
+ */
+ def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+ reduce(ReduceFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
+ * @param description description message for this operator
+ * @return a new stream after reduce
+ */
+ def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+ fold(fn, description).map(_.get)
+ }
+
+ private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = {
+ val op = TransformOp(fn)
+ graph.addVertex(op)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+ new Stream(graph, op, None, windows)
+ }
+
+ /**
+ * Log to task log file
+ */
+ def log(): Unit = {
+ this.map(msg => {
+ LoggerFactory.getLogger("dsl").info(msg.toString)
+ msg
+ }, "log")
+ }
+
+ /**
+ * Merges data from two stream into one
+ *
+ * @param other the other stream
+ * @return the merged stream
+ */
+ def merge(other: Stream[T], parallelism: Int = 1, description: String = "merge"): Stream[T] = {
+ val mergeOp = MergeOp(parallelism, description, UserConfig.empty)
+ graph.addVertex(mergeOp)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+ graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+ val winOp = Stream.addWindowOp(graph, mergeOp, windows)
+ new Stream[T](graph, winOp, None, windows)
+ }
+
+ /**
+ * Group by function (T => Group)
+ *
+ * For example, we have T type, People(name: String, gender: String, age: Int)
+ * groupBy[People](_.gender) will group the people by gender.
+ *
+ * You can append other combinators after groupBy
+ *
+ * For example,
+ * {{{
+ * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
+ * }}}
+ *
+ * @param fn Group by function
+ * @param parallelism Parallelism level
+ * @param description The description
+ * @return the grouped stream
+ */
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ val gbOp = GroupByOp(fn, parallelism, description)
+ graph.addVertex(gbOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), gbOp)
+ val winOp = Stream.addWindowOp(graph, gbOp, windows)
+ new Stream(graph, winOp, None, windows)
+ }
+
+ /**
+ * Window function
+ *
+ * @param windows window definition
+ * @return the windowed [[Stream]]
+ */
+ def window(windows: Windows): Stream[T] = {
+ val winOp = Stream.addWindowOp(graph, thisNode, windows)
+ new Stream(graph, winOp, None, windows)
+ }
+
+ /**
+ * Connects with a low level Processor(TaskDescription)
+ *
+ * @param processor a user defined processor
+ * @param parallelism parallelism level
+ * @return new stream after processing with type [R]
+ */
+ def process[R](
+ processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
+ description: String = "process"): Stream[R] = {
+ val processorOp = ProcessorOp(processor, parallelism, conf, description)
+ graph.addVertex(processorOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+ new Stream[R](graph, processorOp, Some(Shuffle), windows)
+ }
+
+
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+ /**
+ * GroupBy key
+ *
+ * Applies to Stream[Tuple2[K,V]]
+ *
+ * @param parallelism the parallelism for this operation
+ * @return the new KV stream
+ */
+ def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+ stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+ }
+
+ /**
+ * Sum the value of the tuples
+ *
+ * Apply to Stream[Tuple2[K,V]], V must be of type Number
+ *
+ * For input (key, value1), (key, value2), will generate (key, value1 + value2)
+ * @param numeric the numeric operations
+ * @return the sum stream
+ */
+ def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+ stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
+ }
+}
+
+object Stream {
+
+ def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge],
+ windows: Windows): Stream[T] = {
+ new Stream[T](graph, node, edge, windows)
+ }
+
+ def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+ def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
+ = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+ def addWindowOp(graph: Graph[Op, OpEdge], op: Op, win: Windows): Op = {
+ val winOp = WindowOp(win)
+ graph.addVertex(winOp)
+ graph.addEdge(op, Direct, winOp)
+ winOp
+ }
+
+ implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
+ new KVStream(stream)
+ }
+
+ implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+ def sink(dataSink: DataSink, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+ implicit val sink = DataSinkOp(dataSink, parallelism, description, conf)
+ stream.graph.addVertex(sink)
+ stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+ new Stream[T](stream.graph, sink, None, stream.windows)
+ }
+ }
+}
+
+class LoggerSink[T] extends DataSink {
+ var logger: Logger = _
+
+ override def open(context: TaskContext): Unit = {
+ this.logger = context.logger
+ }
+
+ override def write(message: Message): Unit = {
+ logger.info("logging message " + message.value)
+ }
+
+ override def close(): Unit = Unit
+}
\ No newline at end of file