[GEARPUMP-311] refactor state management
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala
new file mode 100644
index 0000000..0cb1185
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStream.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.javaapi
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction}
+import org.apache.gearpump.streaming.refactor.dsl.javaapi.functions.{FlatMapWithStateFunction => JFlatMapWithStateFunction}
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.Stream
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api.Windows
+import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Java DSL
+ */
+class JavaStream[T](val stream: Stream[T]) {
+
+  /** FlatMap on stream */
+  def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
+  }
+
+  def flatMapWithState[R](fn: JFlatMapWithStateFunction[T, R],
+      description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMapWithState(FlatMapWithStateFunction(fn), "flatMapWithState"))
+  }
+
+  /** Map on stream */
+  def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
+  }
+
+  def mapWithState[R](fn: MapWithStateFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMapWithState(FlatMapWithStateFunction(fn), description))
+  }
+
+  /** Only keep the messages that FilterFunction returns true.  */
+  def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
+    new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
+  }
+
+  def fold[A](fn: FoldFunction[T, A], description: String): JavaStream[A] = {
+    new JavaStream[A](stream.fold(fn, description))
+  }
+
+  /** Does aggregation on the stream */
+  def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
+    new JavaStream[T](stream.reduce(fn, description))
+  }
+
+  def log(): Unit = {
+    stream.log()
+  }
+
+  /** Merges streams of same type together */
+  def merge(other: JavaStream[T], parallelism: Int, description: String): JavaStream[T] = {
+    new JavaStream[T](stream.merge(other.stream, parallelism, description))
+  }
+
+  /**
+   * Group by a stream and turns it to a list of sub-streams. Operations chained after
+   * groupBy applies to sub-streams.
+   */
+  def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
+      parallelism: Int, description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
+  }
+
+  def window(win: Windows): JavaStream[T] = {
+    new JavaStream[T](stream.window(win))
+  }
+
+  /** Add a low level Processor to process messages */
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
+    : JavaStream[R] = {
+    new JavaStream[R](stream.process(processor, parallelism, conf, description))
+  }
+}
+
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala
new file mode 100644
index 0000000..9744ec6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/OP.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.plan.functions
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.refactor.dsl.window.impl.{AndThen => WindowRunnerAT}
+import org.apache.gearpump.streaming.{Constants, Processor}
+import org.apache.gearpump.streaming.refactor.dsl.task.{GroupByTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows}
+import org.apache.gearpump.streaming.refactor.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.refactor.source.DataSourceTask
+import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+
+import scala.reflect.ClassTag
+
+object Op {
+
+  def concatenate(desc1: String, desc2: String): String = {
+    if (desc1 == null || desc1.isEmpty) desc2
+    else if (desc2 == null || desc2.isEmpty) desc1
+    else desc1 + "." + desc2
+  }
+
+  def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
+    config1.withConfig(config2)
+  }
+
+  def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
+      processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
+    if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
+      op.chain(
+        WindowOp(GlobalWindows()).chain(TransformOp(new DummyRunner[Any]))
+      ).toProcessor
+    } else {
+      processor
+    }
+  }
+
+}
+
+/**
+ * This is a vertex on the logical plan.
+ */
+sealed trait Op {
+
+  def description: String
+
+  def userConfig: UserConfig
+
+  def chain(op: Op)(implicit system: ActorSystem): Op
+
+  def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+/**
+ * This represents a low level Processor.
+ */
+case class ProcessorOp[T <: Task](
+    processor: Class[T],
+    parallelism: Int,
+    userConfig: UserConfig,
+    description: String)
+  extends Op {
+
+  def this(
+      parallelism: Int = 1,
+      userConfig: UserConfig = UserConfig.empty,
+      description: String = "processor")(implicit classTag: ClassTag[T]) = {
+    this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description)
+  }
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, other)
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    DefaultProcessor(parallelism, description, userConfig, processor)
+  }
+}
+
+/**
+ * This represents a DataSource.
+ */
+case class DataSourceOp(
+    dataSource: DataSource,
+    parallelism: Int = 1,
+    description: String = "source",
+    userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: WindowTransformOp[_, _] =>
+        DataSourceOp(
+          dataSource,
+          parallelism,
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+            op.windowRunner),
+            op.userConfig))
+      case op: TransformOp[_, _] =>
+        chain(
+          WindowOp(GlobalWindows()).chain(op))
+      case op: WindowOp =>
+        chain(
+          op.chain(TransformOp(new DummyRunner[Any]())))
+      case op: TransformWindowTransformOp[_, _, _] =>
+        chain(
+          WindowOp(GlobalWindows()).chain(op.transformOp)
+            .chain(op.windowTransformOp))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Op.withGlobalWindowsDummyRunner(this, userConfig,
+      Processor[DataSourceTask[Any, Any]](parallelism, description,
+        userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
+    )
+  }
+}
+
+/**
+ * This represents a DataSink.
+ */
+case class DataSinkOp(
+    dataSink: DataSink,
+    parallelism: Int = 1,
+    description: String = "sink",
+    userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, op)
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    DataSinkProcessor(dataSink, parallelism, description)
+  }
+}
+
+/**
+ * This represents operations that can be chained together
+ * (e.g. flatMap, map, filter, reduce) and further chained
+ * to another Op to be used
+ */
+case class TransformOp[IN, OUT](
+    fn: FunctionRunner[IN, OUT],
+    userConfig: UserConfig = UserConfig.empty) extends Op {
+
+  override def description: String = fn.description
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: TransformOp[OUT, _] =>
+        // TODO: preserve type info
+        // f3(f2(f1(in)))
+        // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3))
+        // => AndThen(AndThen(f1, f2), f3)
+        TransformOp(
+          AndThen(fn, op.fn),
+          Op.concatenate(userConfig, op.userConfig))
+      case op: WindowOp =>
+        TransformWindowTransformOp(this,
+          WindowTransformOp(new DefaultWindowRunner[OUT, OUT](
+            op.windows, new DummyRunner[OUT]
+          ), op.description, op.userConfig))
+      case op: TransformWindowTransformOp[OUT, _, _] =>
+        TransformWindowTransformOp(TransformOp(
+          AndThen(fn, op.transformOp.fn),
+          Op.concatenate(userConfig, op.transformOp.userConfig)
+        ), op.windowTransformOp)
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    WindowOp(GlobalWindows()).chain(this).toProcessor
+  }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
+ * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
+ * Otherwise, it will be translated to a Processor of TransformTask.
+ */
+case class WindowTransformOp[IN, OUT](
+    windowRunner: WindowRunner[IN, OUT],
+    description: String,
+    userConfig: UserConfig) extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: WindowTransformOp[OUT, _] =>
+        WindowTransformOp(
+          WindowRunnerAT(windowRunner, op.windowRunner),
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig, op.userConfig)
+        )
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+    Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+  }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
+ * It will later be chained to a WindowOp, which results in two WindowTransformOps.
+ * Finally, they will be chained to a single WindowTransformOp.
+ */
+case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+    transformOp: TransformOp[IN, MIDDLE],
+    windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+  override def description: String = {
+    throw new UnsupportedOperationException(s"description is not supported on $this")
+  }
+
+  override def userConfig: UserConfig = {
+    throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+  }
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new UnsupportedOperationException(s"chain is not supported on $this")
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    WindowOp(GlobalWindows()).chain(this).toProcessor
+  }
+}
+
+/**
+ * This represents a window aggregation, together with a following TransformOp
+ */
+case class WindowOp(
+    windows: Windows,
+    userConfig: UserConfig = UserConfig.empty) extends Op {
+
+  override def description: String = windows.description
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: TransformOp[_, _] =>
+        WindowTransformOp(new DefaultWindowRunner(windows, op.fn),
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig, op.userConfig))
+      case op: WindowOp =>
+        chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any])))
+      case op: TransformWindowTransformOp[_, _, _] =>
+        WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn),
+          Op.concatenate(description, op.transformOp.description),
+          Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp)
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    chain(TransformOp(new DummyRunner[Any])).toProcessor
+  }
+
+}
+
+/**
+ * This represents a Processor with groupBy and window aggregation
+ */
+case class GroupByOp[IN, GROUP] private(
+    groupBy: IN => GROUP,
+    parallelism: Int = 1,
+    description: String = "groupBy",
+    override val userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: WindowTransformOp[_, _] =>
+        GroupByOp(
+          groupBy,
+          parallelism,
+          Op.concatenate(description, op.description),
+          Op.concatenate(
+            userConfig
+              .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner),
+            userConfig))
+      case op: WindowOp =>
+        chain(op.chain(TransformOp(new DummyRunner[Any]())))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Op.withGlobalWindowsDummyRunner(this, userConfig,
+      Processor[GroupByTask[IN, GROUP, Any]](parallelism, description,
+        userConfig.withValue(Constants.GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupBy)))
+  }
+}
+
+/**
+ * This represents a Processor transforming merged streams
+ */
+case class MergeOp(
+    parallelism: Int = 1,
+    description: String = "merge",
+    userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: WindowTransformOp[_, _] =>
+        MergeOp(
+          parallelism,
+          description,
+          Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+            op.windowRunner),
+            op.userConfig))
+      case op: WindowOp =>
+        chain(op.chain(TransformOp(new DummyRunner[Any]())))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Op.withGlobalWindowsDummyRunner(this, userConfig,
+      Processor[TransformTask[Any, Any]](parallelism, description, userConfig))
+  }
+
+}
+
+/**
+ * This is an edge on the logical plan.
+ */
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ * e.g. TransformOp
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ * e.g. GroupByOp
+ */
+case object Shuffle extends OpEdge
+
+/**
+ * Runtime exception thrown on chaining.
+ */
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 can't be chained by $op2")
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala
new file mode 100644
index 0000000..c0499c9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/Stream.scala
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.scalaapi
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.refactor.dsl.plan.functions._
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+    private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+    private val edge: Option[OpEdge] = None,
+    private val windows: Windows = GlobalWindows()) {
+
+  /**
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
+   *
+   * @param fn flatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
+   *
+   * @param fn flatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+    transform(new FlatMapper[T, R](fn, description))
+  }
+
+  def flatMapWithState[R](fn: FlatMapWithStateFunction[T, R], description: String): Stream[R] = {
+    transform(new FlatMapper[T, R](fn, description))
+  }
+
+  /**
+   * Returns a new stream by applying a map function to each element.
+   *
+   * @param fn map function
+   * @return A new stream with type [R]
+   */
+  def map[R](fn: T => R, description: String = "map"): Stream[R] = {
+    this.map(MapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a map function to each element.
+   *
+   * @param fn map function
+   * @return A new stream with type [R]
+   */
+  def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  def mapWithState[R](fn: MapWithStateFunction[T, R], description: String): Stream[R] = {
+    this.flatMapWithState(FlatMapWithStateFunction(fn), description);
+  }
+
+  /**
+   * Returns a new Stream keeping the elements that satisfy the filter function.
+   *
+   * @param fn filter function
+   * @return a new stream after filter
+   */
+  def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
+    this.filter(FilterFunction(fn), description)
+  }
+
+  /**
+   * Returns a new Stream keeping the elements that satisfy the filter function.
+   *
+   * @param fn filter function
+   * @return a new stream after filter
+   */
+  def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a fold function over all the elements
+   *
+   * @param fn fold function
+   * @return a new stream after fold
+   */
+  def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
+    transform(new FoldRunner(fn, description))
+  }
+
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
+   * @param description description message for this operator
+   * @return a new stream after reduce
+   */
+  def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+    reduce(ReduceFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
+   * @param description description message for this operator
+   * @return a new stream after reduce
+   */
+  def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+    fold(fn, description).map(_.get)
+  }
+
+  private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = {
+    val op = TransformOp(fn)
+    graph.addVertex(op)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+    new Stream(graph, op, None, windows)
+  }
+
+  /**
+   * Log to task log file
+   */
+  def log(): Unit = {
+    this.map(msg => {
+      LoggerFactory.getLogger("dsl").info(msg.toString)
+      msg
+    }, "log")
+  }
+
+  /**
+   * Merges data from two stream into one
+   *
+   * @param other the other stream
+   * @return  the merged stream
+   */
+  def merge(other: Stream[T], parallelism: Int = 1, description: String = "merge"): Stream[T] = {
+    val mergeOp = MergeOp(parallelism, description, UserConfig.empty)
+    graph.addVertex(mergeOp)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+    graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+    val winOp = Stream.addWindowOp(graph, mergeOp, windows)
+    new Stream[T](graph, winOp, None, windows)
+  }
+
+  /**
+   * Group by function (T => Group)
+   *
+   * For example, we have T type, People(name: String, gender: String, age: Int)
+   * groupBy[People](_.gender) will group the people by gender.
+   *
+   * You can append other combinators after groupBy
+   *
+   * For example,
+   * {{{
+   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
+   * }}}
+   *
+   * @param fn  Group by function
+   * @param parallelism  Parallelism level
+   * @param description  The description
+   * @return the grouped stream
+   */
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    val gbOp = GroupByOp(fn, parallelism, description)
+    graph.addVertex(gbOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), gbOp)
+    val winOp = Stream.addWindowOp(graph, gbOp, windows)
+    new Stream(graph, winOp, None, windows)
+  }
+
+  /**
+   * Window function
+   *
+   * @param windows window definition
+   * @return the windowed [[Stream]]
+   */
+  def window(windows: Windows): Stream[T] = {
+    val winOp = Stream.addWindowOp(graph, thisNode, windows)
+    new Stream(graph, winOp, None, windows)
+  }
+
+  /**
+   * Connects with a low level Processor(TaskDescription)
+   *
+   * @param processor  a user defined processor
+   * @param parallelism  parallelism level
+   * @return  new stream after processing with type [R]
+   */
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
+      description: String = "process"): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf, description)
+    graph.addVertex(processorOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+    new Stream[R](graph, processorOp, Some(Shuffle), windows)
+  }
+
+
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+  /**
+   * GroupBy key
+   *
+   * Applies to Stream[Tuple2[K,V]]
+   *
+   * @param parallelism  the parallelism for this operation
+   * @return  the new KV stream
+   */
+  def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+    stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+  }
+
+  /**
+   * Sum the value of the tuples
+   *
+   * Apply to Stream[Tuple2[K,V]], V must be of type Number
+   *
+   * For input (key, value1), (key, value2), will generate (key, value1 + value2)
+   * @param numeric  the numeric operations
+   * @return  the sum stream
+   */
+  def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+    stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
+  }
+}
+
+object Stream {
+
+  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge],
+      windows: Windows): Stream[T] = {
+    new Stream[T](graph, node, edge, windows)
+  }
+
+  def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+  def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
+  = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+  def addWindowOp(graph: Graph[Op, OpEdge], op: Op, win: Windows): Op = {
+    val winOp = WindowOp(win)
+    graph.addVertex(winOp)
+    graph.addEdge(op, Direct, winOp)
+    winOp
+  }
+
+  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
+    new KVStream(stream)
+  }
+
+  implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+    def sink(dataSink: DataSink, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+      implicit val sink = DataSinkOp(dataSink, parallelism, description, conf)
+      stream.graph.addVertex(sink)
+      stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+      new Stream[T](stream.graph, sink, None, stream.windows)
+    }
+  }
+}
+
+class LoggerSink[T] extends DataSink {
+  var logger: Logger = _
+
+  override def open(context: TaskContext): Unit = {
+    this.logger = context.logger
+  }
+
+  override def write(message: Message): Unit = {
+    logger.info("logging message " + message.value)
+  }
+
+  override def close(): Unit = Unit
+}
\ No newline at end of file