[GEARPUMP-367] Don't use windowing unnecessarily

Author: manuzhang <owenzhang1990@gmail.com>

Closes #226 from manuzhang/simplify_dsl.
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
index 6d43f16..2d3d94b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
@@ -36,7 +36,7 @@
  *     * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]]
  *     * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops.
  *
- *     All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally
+ *     All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.StreamingOperator]], which internally
  *     runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with
  *     [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually
  *     executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]].
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index c37ced6..4a71b08 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -21,12 +21,11 @@
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FunctionRunner}
-import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => WindowRunnerAT}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FlatMapper, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{AndThenOperator, FlatMapOperator, StreamingOperator, WindowOperator}
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
 import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows}
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
 import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
 import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
 import org.apache.gearpump.streaming.task.Task
@@ -69,6 +68,16 @@
     }
   }
 
+  def isFlatMapper(runner: FunctionRunner[Any, Any]): Boolean = {
+    runner match {
+      case fm: FlatMapper[Any, Any] =>
+        true
+      case at: AndThen[Any, Any, Any] =>
+        isFlatMapper(at.first) && isFlatMapper(at.second)
+      case _ =>
+        false
+    }
+  }
 }
 
 /**
@@ -134,39 +143,59 @@
     dataSource: DataSource,
     parallelism: Int = 1,
     description: String = "source",
-    userConfig: UserConfig = UserConfig.empty)
+    userConfig: UserConfig = UserConfig.empty,
+    operator: Option[StreamingOperator[Any, Any]] = None)
   extends Op {
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
-      case op: WindowTransformOp[_, _] =>
+      case op: WindowTransformOp[Any, Any] =>
+        val chainedRunner =
+          operator.map(AndThenOperator(_, op.operator)).getOrElse(op.operator)
         DataSourceOp(
           dataSource,
           parallelism,
           Op.concatenate(description, op.description),
           Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
-            op.windowRunner),
-            op.userConfig))
-      case op: TransformOp[_, _] =>
-        chain(
-          WindowOp(GlobalWindows()).chain(op))
+            chainedRunner),
+            op.userConfig),
+        Some(chainedRunner))
+      case op: TransformOp[Any, Any] =>
+        val runner = op.runner
+        if (Op.isFlatMapper(runner)) {
+          val fm = new FlatMapOperator[Any, Any](runner)
+          val chainedRunner =
+            operator.map(AndThenOperator(_, fm)).getOrElse(fm)
+          DataSourceOp(
+            dataSource,
+            parallelism,
+            Op.concatenate(description, op.description),
+            Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+              chainedRunner),
+              op.userConfig),
+            Some(chainedRunner)
+          )
+        } else {
+          chain(
+            WindowOp(GlobalWindows()).chain(op))
+        }
       case op: WindowOp =>
         chain(
           op.chain(TransformOp(new DummyRunner[Any]())))
       case op: TransformWindowTransformOp[_, _, _] =>
-        chain(
-          WindowOp(GlobalWindows()).chain(op.transformOp)
-            .chain(op.windowTransformOp))
+        chain(op.transformOp).chain(op.windowTransformOp)
       case _ =>
         throw new OpChainException(this, other)
     }
   }
 
   override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    Op.withGlobalWindowsDummyRunner(this, userConfig,
+    if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
+      chain(TransformOp(new DummyRunner[Any])).toProcessor
+    } else {
       Processor[DataSourceTask[Any, Any]](parallelism, description,
         userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
-    )
+    }
   }
 }
 
@@ -195,10 +224,10 @@
  * to another Op to be executed
  */
 case class TransformOp[IN, OUT](
-    fn: FunctionRunner[IN, OUT],
+    runner: FunctionRunner[IN, OUT],
     userConfig: UserConfig = UserConfig.empty) extends Op {
 
-  override def description: String = fn.description
+  override def description: String = runner.description
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
@@ -208,16 +237,16 @@
         // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3))
         // => AndThen(AndThen(f1, f2), f3)
         TransformOp(
-          AndThen(fn, op.fn),
+          AndThen(runner, op.runner),
           Op.concatenate(userConfig, op.userConfig))
       case op: WindowOp =>
         TransformWindowTransformOp(this,
-          WindowTransformOp(new DefaultWindowRunner[OUT, OUT](
+          WindowTransformOp(new WindowOperator[OUT, OUT](
             op.windows, new DummyRunner[OUT]
           ), op.description, op.userConfig))
       case op: TransformWindowTransformOp[OUT, _, _] =>
         TransformWindowTransformOp(TransformOp(
-          AndThen(fn, op.transformOp.fn),
+          AndThen(runner, op.transformOp.runner),
           Op.concatenate(userConfig, op.transformOp.userConfig)
         ), op.windowTransformOp)
       case _ =>
@@ -244,13 +273,13 @@
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
       case op: TransformOp[_, _] =>
-        WindowTransformOp(new DefaultWindowRunner(windows, op.fn),
+        WindowTransformOp(new WindowOperator(windows, op.runner),
           Op.concatenate(description, op.description),
           Op.concatenate(userConfig, op.userConfig))
       case op: WindowOp =>
         chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any])))
       case op: TransformWindowTransformOp[_, _, _] =>
-        WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn),
+        WindowTransformOp(new WindowOperator(windows, op.transformOp.runner),
           Op.concatenate(description, op.transformOp.description),
           Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp)
       case _ =>
@@ -290,7 +319,7 @@
           Op.concatenate(description, op.description),
           Op.concatenate(
             userConfig
-              .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner),
+              .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.operator),
             userConfig))
       case op: WindowOp =>
         chain(op.chain(TransformOp(new DummyRunner[Any]())))
@@ -329,7 +358,7 @@
           parallelism,
           description,
           Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
-            op.windowRunner),
+            op.operator),
             op.userConfig))
       case op: WindowOp =>
         chain(op.chain(TransformOp(new DummyRunner[Any]())))
@@ -352,7 +381,7 @@
  * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
  */
 private case class WindowTransformOp[IN, OUT](
-    windowRunner: WindowRunner[IN, OUT],
+    operator: StreamingOperator[IN, OUT],
     description: String,
     userConfig: UserConfig) extends Op {
 
@@ -360,7 +389,7 @@
     other match {
       case op: WindowTransformOp[OUT, _] =>
         WindowTransformOp(
-          WindowRunnerAT(windowRunner, op.windowRunner),
+          AndThenOperator(operator, op.operator),
           Op.concatenate(description, op.description),
           Op.concatenate(userConfig, op.userConfig)
         )
@@ -372,7 +401,7 @@
   override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
     // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
     Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
-      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+      Constants.GEARPUMP_STREAMING_OPERATOR, operator))
   }
 }
 
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index 2c11238..c638257 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -39,6 +39,7 @@
   def description: String
 }
 
+
 case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE],
     second: FunctionRunner[MIDDLE, OUT])
   extends FunctionRunner[IN, OUT] {
@@ -114,10 +115,5 @@
   }
 }
 
-class DummyRunner[T] extends FunctionRunner[T, T] {
-
-  override def process(value: T): TraversableOnce[T] = Option(value)
-
-  override def description: String = ""
-}
+class DummyRunner[T] extends FlatMapper[T, T](FlatMapFunction(Option(_)), "")
 
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
index b3f3ad2..b615354 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
@@ -25,7 +25,7 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator}
 import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
@@ -44,20 +44,21 @@
     )
   }
 
-  private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
-    new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+  private val groups: UnifiedMap[GROUP, StreamingOperator[IN, OUT]] =
+    new UnifiedMap[GROUP, StreamingOperator[IN, OUT]]
 
   override def onNext(message: Message): Unit = {
     val input = message.value.asInstanceOf[IN]
     val group = groupBy(input)
 
     if (!groups.containsKey(group)) {
-      groups.put(group,
-        userConfig.getValue[WindowRunner[IN, OUT]](
-          GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+      val operator = userConfig.getValue[StreamingOperator[IN, OUT]](
+        GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get
+      operator.setup()
+      groups.put(group, operator)
     }
 
-    groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN],
+    groups.get(group).foreach(TimestampedValue(message.value.asInstanceOf[IN],
       message.timestamp))
   }
 
@@ -65,11 +66,19 @@
     if (groups.isEmpty && watermark == Watermark.MAX) {
       taskContext.updateWatermark(Watermark.MAX)
     } else {
-      groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
-        override def accept(runner: WindowRunner[IN, OUT]): Unit = {
-          TaskUtil.trigger(watermark, runner, taskContext)
+      groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] {
+        override def accept(operator: StreamingOperator[IN, OUT]): Unit = {
+          TaskUtil.trigger(watermark, operator, taskContext)
         }
       })
     }
   }
+
+  override def onStop(): Unit = {
+    groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] {
+      override def accept(operator: StreamingOperator[IN, OUT]): Unit = {
+        operator.teardown()
+      }
+    })
+  }
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 5ad64fa..6c78e0b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -22,25 +22,33 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{StreamingOperator, TimestampedValue}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
 class TransformTask[IN, OUT](
-    runner: WindowRunner[IN, OUT],
+    operator: StreamingOperator[IN, OUT],
     taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
   def this(context: TaskContext, conf: UserConfig) = {
     this(
-      conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
       context, conf
     )
   }
 
+  override def onStart(startTime: Instant): Unit = {
+    operator.setup()
+  }
+
   override def onNext(msg: Message): Unit = {
-    runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
+    operator.foreach(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    TaskUtil.trigger(watermark, runner, taskContext)
+    TaskUtil.trigger(watermark, operator, taskContext)
+  }
+
+  override def onStop(): Unit = {
+    operator.teardown()
   }
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
similarity index 65%
rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
index ee3c067..4f29c9e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
@@ -23,6 +23,7 @@
 import com.gs.collections.api.block.procedure.Procedure
 import com.gs.collections.impl.list.mutable.FastList
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.Message
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
 import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
@@ -32,65 +33,127 @@
 import scala.collection.mutable.ArrayBuffer
 
 /**
- * Inputs for [[WindowRunner]].
+ * Inputs for [[StreamingOperator]].
  */
-case class TimestampedValue[T](value: T, timestamp: Instant)
+case class TimestampedValue[T](value: T, timestamp: Instant) {
+
+  def this(msg: Message) = {
+    this(msg.value.asInstanceOf[T], msg.timestamp)
+  }
+
+  def toMessage: Message = Message(value, timestamp)
+}
 
 /**
- * Outputs triggered by [[WindowRunner]]
+ * Outputs triggered by [[StreamingOperator]]
  */
 case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
     watermark: Instant)
 
+
+trait StreamingOperator[IN, OUT] extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def foreach(tv: TimestampedValue[IN]): Unit
+
+  def flatMap(
+      tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+    foreach(tv)
+    None
+  }
+
+  def trigger(time: Instant): TriggeredOutputs[OUT]
+
+  def teardown(): Unit = {}
+}
+
+/**
+ * A composite WindowRunner that first executes its left child and feeds results
+ * into result child.
+ */
+case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE],
+    right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] {
+
+  override def setup(): Unit = {
+    left.setup()
+    right.setup()
+  }
+
+  override def foreach(
+      tv: TimestampedValue[IN]): Unit = {
+    left.flatMap(tv).foreach(right.flatMap)
+  }
+
+  override def flatMap(
+      tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+    left.flatMap(tv).flatMap(right.flatMap)
+  }
+
+  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+    val lOutputs = left.trigger(time)
+    lOutputs.outputs.foreach(right.foreach)
+    right.trigger(lOutputs.watermark)
+  }
+
+  override def teardown(): Unit = {
+    left.teardown()
+    right.teardown()
+  }
+}
+
+/**
+ * @param runner FlatMapper or chained FlatMappers
+ */
+class FlatMapOperator[IN, OUT](runner: FunctionRunner[IN, OUT])
+  extends StreamingOperator[IN, OUT] {
+
+  override def setup(): Unit = {
+    runner.setup()
+  }
+
+  override def foreach(tv: TimestampedValue[IN]): Unit = {
+    throw new UnsupportedOperationException("foreach should not be invoked on FlatMapOperator; " +
+      "please use flatMap instead")
+  }
+
+  override def flatMap(
+      tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+    runner.process(tv.value)
+      .map(TimestampedValue(_, tv.timestamp))
+  }
+
+  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+    TriggeredOutputs(None, time)
+  }
+
+  override def teardown(): Unit = {
+    runner.teardown()
+  }
+}
+
 /**
  * This is responsible for executing window calculation.
  *   1. Groups elements into windows as defined by window function
  *   2. Applies window calculation to each group
  *   3. Emits results on triggering
  */
-trait WindowRunner[IN, OUT] extends java.io.Serializable {
-
-  def process(timestampedValue: TimestampedValue[IN]): Unit
-
-  def trigger(time: Instant): TriggeredOutputs[OUT]
-}
-
-/**
- * A composite WindowRunner that first executes its left child and feeds results
- * into result child.
- */
-case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
-    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
-
-  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
-    left.process(timestampedValue)
-  }
-
-  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
-    val lOutputs = left.trigger(time)
-    lOutputs.outputs.foreach(right.process)
-    right.trigger(lOutputs.watermark)
-  }
-}
-
-/**
- * Default implementation for [[WindowRunner]].
- */
-class DefaultWindowRunner[IN, OUT](
+class WindowOperator[IN, OUT](
     windows: Windows,
-    fnRunner: FunctionRunner[IN, OUT])
-  extends WindowRunner[IN, OUT] {
+    runner: FunctionRunner[IN, OUT])
+  extends StreamingOperator[IN, OUT] {
 
   private val windowFn = windows.windowFn
   private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]]
-  private var setup = false
+  private var isSetup = false
   private var watermark = Watermark.MIN
 
-  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
+  override def foreach(
+      tv: TimestampedValue[IN]): Unit = {
     val wins = windowFn(new Context[IN] {
-      override def element: IN = timestampedValue.value
+      override def element: IN = tv.value
 
-      override def timestamp: Instant = timestampedValue.timestamp
+      override def timestamp: Instant = tv.timestamp
     })
     wins.foreach { win =>
       if (windowFn.isNonMerging) {
@@ -98,9 +161,9 @@
           val inputs = new FastList[TimestampedValue[IN]]
           windowInputs.put(win, inputs)
         }
-        windowInputs.get(win).add(timestampedValue)
+        windowInputs.get(win).add(tv)
       } else {
-        merge(windowInputs, win, timestampedValue)
+        merge(windowInputs, win, tv)
       }
     }
 
@@ -133,26 +196,26 @@
         val firstWin = windowInputs.firstKey
         if (!time.isBefore(firstWin.endTime)) {
           val inputs = windowInputs.remove(firstWin)
-          if (!setup) {
-            fnRunner.setup()
-            setup = true
+          if (!isSetup) {
+            runner.setup()
+            isSetup = true
           }
           inputs.forEach(new Procedure[TimestampedValue[IN]] {
             override def value(tv: TimestampedValue[IN]): Unit = {
-              fnRunner.process(tv.value).foreach {
+              runner.process(tv.value).foreach {
                 out: OUT => outputs += TimestampedValue(out, tv.timestamp)
               }
             }
           })
-          fnRunner.finish().foreach {
+          runner.finish().foreach {
             out: OUT =>
               outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
           }
           val newWmk = TaskUtil.max(wmk, firstWin.endTime)
           if (windows.accumulationMode == Discarding) {
-            fnRunner.teardown()
+            runner.teardown()
             // discarding, setup need to be called for each window
-            setup = false
+            isSetup = false
           }
           onTrigger(outputs, newWmk)
         } else {
@@ -176,4 +239,8 @@
     watermark = TaskUtil.max(watermark, triggeredOutputs.watermark)
     TriggeredOutputs(triggeredOutputs.outputs, watermark)
   }
+
+  override def teardown(): Unit = {
+    runner.teardown()
+  }
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
index dd4c0d3..c471a00 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
@@ -18,11 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
-import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows}
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, Window, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{FlatMapOperator, StreamingOperator}
 import org.apache.gearpump.streaming.{Constants, Processor}
 
 /**
@@ -48,19 +48,9 @@
     Processor[DataSourceTask[Any, Any]](parallelism, description,
       taskConf
         .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource)
-        .withValue[WindowRunner[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
-        new DefaultWindowRunner[Any, Any](
-          Windows(PerElementWindowFunction, description = "perElementWindows"),
-          new DummyRunner[Any])))
-  }
-
-
-  case object PerElementWindowFunction extends WindowFunction {
-    override def apply[T](
-        context: WindowFunction.Context[T]): Array[Window] = {
-      Array(Window(context.timestamp, context.timestamp.plusMillis(1)))
-    }
-
-    override def isNonMerging: Boolean = true
+        .withValue[StreamingOperator[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
+        new FlatMapOperator(new DummyRunner[Any])
+      )
+    )
   }
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index f93c496..b09ad66 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,7 +23,7 @@
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
 /**
@@ -40,7 +40,7 @@
  */
 class DataSourceTask[IN, OUT] private[source](
     source: DataSource,
-    windowRunner: WindowRunner[IN, OUT],
+    operator: StreamingOperator[IN, OUT],
     context: TaskContext,
     conf: UserConfig)
   extends Task(context, conf) {
@@ -48,7 +48,7 @@
   def this(context: TaskContext, conf: UserConfig) = {
     this(
       conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
-      conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
       context, conf
     )
   }
@@ -58,27 +58,32 @@
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at ${startTime.toEpochMilli}")
     source.open(context, startTime)
+    operator.setup()
 
     self ! Watermark(source.getWatermark)
   }
 
   override def onNext(m: Message): Unit = {
     0.until(batchSize).foreach { _ =>
-      Option(source.read()).foreach(
-        msg => windowRunner.process(
-          TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)))
+      Option(source.read()).foreach(process)
     }
 
     self ! Watermark(source.getWatermark)
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    TaskUtil.trigger(watermark, windowRunner, context)
+    TaskUtil.trigger(watermark, operator, context)
   }
 
   override def onStop(): Unit = {
     LOG.info("closing data source...")
     source.close()
+    operator.teardown()
+  }
+
+  private def process(msg: Message): Unit = {
+    operator.flatMap(new TimestampedValue(msg))
+      .foreach { tv => context.output(tv.toMessage) }
   }
 
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
index bd889c4..ed304ce 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
@@ -21,7 +21,7 @@
 import java.time.Instant
 
 import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator}
 
 object TaskUtil {
 
@@ -36,7 +36,7 @@
     loader.loadClass(className).asSubclass(classOf[Task])
   }
 
-  def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT],
+  def trigger[IN, OUT](watermark: Instant, runner: StreamingOperator[IN, OUT],
       context: TaskContext): Unit = {
     val triggeredOutputs = runner.trigger(watermark)
     context.updateWatermark(triggeredOutputs.watermark)
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index ca0135d..79ef135 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -28,7 +28,7 @@
 import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator}
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -66,7 +66,7 @@
       val dataSourceOp = DataSourceOp(dataSource)
       val transformOp = mock[TransformOp[Any, Any]]
       val fn = mock[FunctionRunner[Any, Any]]
-      when(transformOp.fn).thenReturn(fn)
+      when(transformOp.runner).thenReturn(fn)
 
       val chainedOp = dataSourceOp.chain(transformOp)
 
@@ -85,7 +85,7 @@
       val processor = dataSourceOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe dataSourceOp.parallelism
-      processor.description shouldBe s"${dataSourceOp.description}.globalWindows"
+      processor.description shouldBe s"${dataSourceOp.description}"
     }
   }
 
@@ -173,9 +173,9 @@
 
     "chain WindowTransformOp" in {
 
-      val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner())
+      val runner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner())
       val windowTransformOp = mock[WindowTransformOp[Any, Any]]
-      when(windowTransformOp.windowRunner).thenReturn(runner)
+      when(windowTransformOp.operator).thenReturn(runner)
 
       val chainedOp = groupByOp.chain(windowTransformOp)
       chainedOp shouldBe a[GroupByOp[_, _]]
@@ -199,9 +199,9 @@
     val mergeOp = MergeOp()
 
     "chain WindowTransformOp" in {
-      val runner = mock[WindowRunner[Any, Any]]
+      val runner = mock[StreamingOperator[Any, Any]]
       val windowTransformOp = mock[WindowTransformOp[Any, Any]]
-      when(windowTransformOp.windowRunner).thenReturn(runner)
+      when(windowTransformOp.operator).thenReturn(runner)
 
       val chainedOp = mergeOp.chain(windowTransformOp)
       chainedOp shouldBe a [MergeOp]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index be4cc63..f2c9d0e 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -87,8 +87,8 @@
       .mapVertex(_.description)
 
     plan.getVertices.toSet should contain theSameElementsAs
-      Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
-    plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
+      Set("source", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
+    plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe
       a[GroupByPartitioner[_, _]]
     plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe
       a[CoLocationPartitioner]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index 6244224..c92f9c8 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -29,7 +29,7 @@
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.task.TransformTask
 import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
@@ -218,11 +218,11 @@
 
       val data = "one two three".split("\\s+")
       val dataSource = new CollectionDataSource[String](data)
-      val runner1 = new DefaultWindowRunner[String, String](
+      val runner1 = new WindowOperator[String, String](
         GlobalWindows(), new DummyRunner[String])
       val conf = UserConfig.empty
         .withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
-        .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1)
+        .withValue[StreamingOperator[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1)
 
       // Source with no transformer
       val source = new DataSourceTask[String, String](
@@ -239,7 +239,7 @@
       val anotherTaskContext = MockUtil.mockTaskContext
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
-      val runner2 = new DefaultWindowRunner[String, String](
+      val runner2 = new WindowOperator[String, String](
         GlobalWindows(), double)
       val another = new DataSourceTask(anotherTaskContext,
         conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2))
@@ -262,7 +262,7 @@
       val conf = UserConfig.empty
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
-      val transform = new DefaultWindowRunner[String, String](GlobalWindows(), double)
+      val transform = new WindowOperator[String, String](GlobalWindows(), double)
       val task = new TransformTask[String, String](transform, taskContext, conf)
       task.onStart(Instant.EPOCH)
 
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index d43bca0..f701c5e 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -61,9 +61,9 @@
     dag.getVertices.size shouldBe 2
     dag.getVertices.foreach { processor =>
       processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
-      if (processor.description == "A.globalWindows") {
+      if (processor.description == "A") {
         processor.parallelism shouldBe 2
-      } else if (processor.description == "B.globalWindows") {
+      } else if (processor.description == "B") {
         processor.parallelism shouldBe 3
       } else {
         fail(s"undefined source ${processor.description}")
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
index 9e6bf59..62e14f4 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
@@ -24,7 +24,7 @@
 import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
 import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
 import org.apache.gearpump.streaming.{Constants, MockUtil}
-import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner
+import org.apache.gearpump.streaming.dsl.window.impl.WindowOperator
 import org.apache.gearpump.streaming.source.Watermark
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -40,7 +40,7 @@
 
     forAll(longGen) { (time: Instant) =>
       val groupBy = mock[Any => Int]
-      val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner[Any])
+      val windowRunner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner[Any])
       val context = MockUtil.mockTaskContext
       val config = UserConfig.empty
         .withValue(
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index e38c5a3..0009ad5 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,7 +22,7 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator}
 import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
@@ -36,7 +36,7 @@
     val watermarkGen = longGen.map(Instant.ofEpochMilli)
 
     forAll(watermarkGen) { (watermark: Instant) =>
-      val windowRunner = mock[WindowRunner[Any, Any]]
+      val windowRunner = mock[StreamingOperator[Any, Any]]
       val context = MockUtil.mockTaskContext
       val config = UserConfig.empty
       val task = new TransformTask[Any, Any](windowRunner, context, config)
@@ -45,7 +45,7 @@
       val message = Message(value, time)
 
       task.onNext(message)
-      verify(windowRunner).process(TimestampedValue(value, time))
+      verify(windowRunner).foreach(TimestampedValue(value, time))
 
       when(windowRunner.trigger(watermark)).thenReturn(
         TriggeredOutputs(Some(TimestampedValue(value, time)), watermark))
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
index b23d0ee..1ac7213 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -46,10 +46,10 @@
     implicit val system = MockUtil.system
     val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
     val windows = SessionWindows.apply(Duration.ofMillis(4L))
-    val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows,
+    val windowRunner = new WindowOperator[KV, Option[KV]](windows,
       new FoldRunner[KV, Option[KV]](reduce, "reduce"))
 
-    data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
+    data.foreach(m => windowRunner.foreach(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
     windowRunner.trigger(Watermark.MAX).outputs.toList shouldBe
       List(
         TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)),
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index d62739a..cd2cfa7 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,7 +23,7 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -40,7 +40,7 @@
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-        val runner = mock[WindowRunner[Any, Any]]
+        val runner = mock[StreamingOperator[Any, Any]]
       val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
 
       sourceTask.onStart(startTime)
@@ -57,13 +57,17 @@
         val dataSource = mock[DataSource]
         val config = UserConfig.empty
           .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-        val runner = mock[WindowRunner[Any, Any]]
-        val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
+        val processor = mock[StreamingOperator[String, String]]
+        val sourceTask = new DataSourceTask[String, String](dataSource, processor,
+          taskContext, config)
         val msg = Message(str, timestamp)
         when(dataSource.read()).thenReturn(msg)
 
-        when(runner.trigger(Watermark.MAX)).thenReturn(
-          TriggeredOutputs(Some(TimestampedValue(str.asInstanceOf[Any], timestamp)), Watermark.MAX))
+        when(processor.flatMap(new TimestampedValue[String](msg))).thenReturn(
+          Some(new TimestampedValue[String](msg))
+        )
+        when(processor.trigger(Watermark.MAX)).thenReturn(
+          TriggeredOutputs[String](None, Watermark.MAX))
 
         sourceTask.onNext(Message("next"))
         sourceTask.onWatermarkProgress(Watermark.MAX)
@@ -79,7 +83,7 @@
     val dataSource = mock[DataSource]
     val config = UserConfig.empty
       .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-    val runner = mock[WindowRunner[Any, Any]]
+    val runner = mock[StreamingOperator[Any, Any]]
     val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
 
     sourceTask.onStop()