[GEARPUMP-262] Add setup and teardown to user defined functions

Author: manuzhang <owenzhang1990@gmail.com>

Closes #131 from manuzhang/setup_teardown.
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
index 49d3619..cbfe57a 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
@@ -21,8 +21,8 @@
 import java.util.Properties
 
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser}
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
 import org.apache.gearpump.streaming.kafka.KafkaStoreFactory
 import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL
 import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index d4866ed..2942861 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -25,12 +25,17 @@
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.source.DataSource;
 import org.apache.gearpump.streaming.task.TaskContext;
 import scala.Tuple2;
 
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Iterator;
 
 /** Java version of WordCount with high level DSL API */
 public class WordCount {
@@ -46,15 +51,13 @@
     JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
         1, UserConfig.empty(), "source");
 
-    JavaStream<String> words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(),
-        "flatMap");
+    JavaStream<String> words = sentence.flatMap(new Split(), "flatMap");
 
-    JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 1), "map");
+    JavaStream<Tuple2<String, Integer>> ones = words.map(new Ones(), "map");
 
-    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy");
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), 1, "groupBy");
 
-    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(
-        (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce");
+    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new Count(), "reduce");
 
     wordcount.log();
 
@@ -88,4 +91,36 @@
       return Instant.now();
     }
   }
+
+  private static class Split extends FlatMapFunction<String, String> {
+
+    @Override
+    public Iterator<String> apply(String s) {
+      return Arrays.asList(s.split("\\s+")).iterator();
+    }
+  }
+
+  private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> apply(String s) {
+      return new Tuple2<>(s, 1);
+    }
+  }
+
+  private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+      return new Tuple2<>(t1._1(), t1._2() + t2._2());
+    }
+  }
+
+  private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
+
+    @Override
+    public String apply(Tuple2<String, Integer> tuple) {
+      return tuple._1();
+    }
+  }
 }
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 4f43fd4..401eac0 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -22,7 +22,7 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
 import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.TaskContext
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 22f597c..1cbfb22 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -20,8 +20,8 @@
 
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.StreamApp._
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
 import org.apache.gearpump.util.AkkaApp
 
 /** Same WordCount with High level DSL syntax */
diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
index 2417763..22efa89 100644
--- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -18,13 +18,10 @@
 package org.apache.gearpump.external.hbase.dsl
 
 import scala.language.implicitConversions
-
 import org.apache.hadoop.conf.Configuration
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.external.hbase.HBaseSink
-import org.apache.gearpump.streaming.dsl.Stream
-import org.apache.gearpump.streaming.dsl.Stream.Sink
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
 
 /** Create a HBase DSL Sink */
 class HBaseDSLSink[T](stream: Stream[T]) {
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
index f1bb26a..996ae0b 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
@@ -21,7 +21,7 @@
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp}
 import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
@@ -44,7 +44,7 @@
       parallelism: Int = 1,
       config: UserConfig = UserConfig.empty,
       description: String = "KafkaSource"
-      ): dsl.Stream[T] = {
+      ): Stream[T] = {
     app.source[T](new KafkaSource(topics, properties), parallelism, config, description)
   }
 
@@ -66,19 +66,19 @@
       properties: Properties,
       parallelism: Int = 1,
       config: UserConfig = UserConfig.empty,
-      description: String = "KafkaSource"): dsl.Stream[T] = {
+      description: String = "KafkaSource"): Stream[T] = {
     val source = new KafkaSource(topics, properties)
     source.setCheckpointStore(checkpointStoreFactory)
     app.source[T](source, parallelism, config, description)
   }
 
   import scala.language.implicitConversions
-  implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = {
+  implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = {
     new KafkaDSL[T](stream)
   }
 }
 
-class KafkaDSL[T](stream: dsl.Stream[T]) {
+class KafkaDSL[T](stream: Stream[T]) {
 
   /**
    * Sinks data to Kafka
@@ -94,7 +94,7 @@
       properties: Properties,
       parallelism: Int = 1,
       userConfig: UserConfig = UserConfig.empty,
-      description: String = "KafkaSink"): dsl.Stream[T] = {
+      description: String = "KafkaSink"): Stream[T] = {
     stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description)
   }
 }
\ No newline at end of file
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
deleted file mode 100644
index 9788dd2..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-/**
- * Function that converts a value of type T to a iterator of values of type R.
- *
- * @param <T> Input value type
- * @param <R> Return value type
- */
-public interface FlatMapFunction<T, R> extends Serializable {
-  Iterator<R> apply(T t);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
deleted file mode 100644
index 6c71280..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * GroupBy function which assign value of type T to groups
- *
- * @param <T> Input value type
- * @param <Group> Group Type
- */
-public interface GroupByFunction<T, Group> extends Serializable {
-  Group apply(T t);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
deleted file mode 100644
index e1fc821..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that map a value of type T to value of type R
- *
- * @param <T> Input value type
- * @param <R> Output value type
- */
-public interface MapFunction<T, R> extends Serializable {
-  R apply(T t);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
deleted file mode 100644
index 2bcac60..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that applies reduce operation
- *
- * @param <T> Input value type
- */
-public interface ReduceFunction<T> extends Serializable {
-  T apply(T t1, T t2);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
similarity index 61%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index f07ceff..e4e7309 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -15,16 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.gearpump.streaming.dsl.api.functions
 
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
 
-import java.io.Serializable;
+object FilterFunction {
+
+  def apply[T](fn: T => Boolean): FilterFunction[T] = {
+    new FilterFunction[T] {
+      override def apply(t: T): Boolean = {
+        fn(t)
+      }
+    }
+  }
+}
 
 /**
- * Filter function
+ * Returns true to keep the input and false otherwise.
  *
- * @param <T> Message of type T
+ * @param T Input value type
  */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
+abstract class FilterFunction[T] extends SerializableFunction {
+
+  def apply(t: T): Boolean
+
 }
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
similarity index 60%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index f07ceff..70fe9d4 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -15,16 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.gearpump.streaming.dsl.api.functions
 
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
 
-import java.io.Serializable;
+object MapFunction {
+
+  def apply[T, R](fn: T => R): MapFunction[T, R] = {
+    new MapFunction[T, R] {
+      override def apply(t: T): R = {
+        fn(t)
+      }
+    }
+  }
+}
 
 /**
- * Filter function
+ * Transforms an input into an output of possibly different types.
  *
- * @param <T> Message of type T
+ * @param T Input value type
+ * @param R Output value type
  */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
+abstract class MapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): R
+
 }
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
similarity index 60%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
index f07ceff..25b12be 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
@@ -15,16 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.gearpump.streaming.dsl.api.functions
 
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
 
-import java.io.Serializable;
+object ReduceFunction {
+
+  def apply[T](fn: (T, T) => T): ReduceFunction[T] = {
+    new ReduceFunction[T] {
+      override def apply(t1: T, t2: T): T = {
+        fn(t1, t2)
+      }
+    }
+  }
+}
 
 /**
- * Filter function
+ * Combines two inputs into one output of the same type.
  *
- * @param <T> Message of type T
+ * @param T Type of both inputs and output
  */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
+abstract class ReduceFunction[T] extends SerializableFunction {
+
+  def apply(t1: T, t2: T): T
+
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index f2654ea..7f3c250 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -15,14 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gearpump.streaming.dsl.javaapi
 
-import scala.collection.JavaConverters._
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
 import org.apache.gearpump.streaming.dsl.window.api.Window
-import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
-import org.apache.gearpump.streaming.javaapi.dsl.functions._
 import org.apache.gearpump.streaming.task.Task
 
 /**
@@ -31,23 +31,23 @@
 class JavaStream[T](val stream: Stream[T]) {
 
   /** FlatMap on stream */
-  def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
-    new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+  def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
   }
 
   /** Map on stream */
   def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
-    new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
   }
 
   /** Only keep the messages that FilterFunction returns true.  */
   def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+    new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
   }
 
   /** Does aggregation on the stream */
   def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description))
+    new JavaStream[T](stream.reduce(fn, description))
   }
 
   def log(): Unit = {
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index 82a284e..b8d1f4c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -19,13 +19,14 @@
 package org.apache.gearpump.streaming.dsl.javaapi
 
 import java.util.Collection
-import scala.collection.JavaConverters._
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp}
 import org.apache.gearpump.streaming.source.DataSource
 
+import scala.collection.JavaConverters._
+
 class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
 
   private val streamApp = StreamApp(name, context, userConfig)
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
similarity index 63%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index f07ceff..85d597d 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -15,16 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
 
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
 
 /**
- * Filter function
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Java version of FlatMapFunction returns a java.util.Iterator.
  *
- * @param <T> Message of type T
+ * @param T Input value type
+ * @param R Output value type
  */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): java.util.Iterator[R]
 }
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
similarity index 72%
rename from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
index f07ceff..7656cba 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -15,16 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
 
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
 
 /**
- * Filter function
+ * Assigns the input value into a group.
  *
- * @param <T> Message of type T
+ * @param T Input value type
+ * @param GROUP Group value type
  */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
-}
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index f15d875..82ea7c7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -22,7 +22,7 @@
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction}
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.TransformTask
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
@@ -134,7 +134,7 @@
     other match {
       case op: ChainableOp[OUT, _] =>
         // TODO: preserve type info
-        ChainableOp(fn.andThen(op.fn))
+        ChainableOp(AndThen(fn, op.fn))
       case _ =>
         throw new OpChainException(this, other)
     }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 5322648..687fd2e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -17,23 +17,35 @@
  */
 package org.apache.gearpump.streaming.dsl.plan.functions
 
-trait SingleInputFunction[IN, OUT] extends Serializable {
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+
+/**
+ * Internal function to process single input
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
   def process(value: IN): TraversableOnce[OUT]
-  def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
-    AndThen(this, other)
-  }
+
   def finish(): TraversableOnce[OUT] = None
-  def clearState(): Unit = {}
+
+  def teardown(): Unit = {}
+
   def description: String
 }
 
-case class AndThen[IN, MIDDLE, OUT](
-    first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE],
+    second: SingleInputFunction[MIDDLE, OUT])
   extends SingleInputFunction[IN, OUT] {
 
-  override def andThen[OUTER](
-      other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
-    first.andThen(second.andThen(other))
+  override def setup(): Unit = {
+    first.setup()
+    second.setup()
   }
 
   override def process(value: IN): TraversableOnce[OUT] = {
@@ -49,9 +61,9 @@
     }
   }
 
-  override def clearState(): Unit = {
-    first.clearState()
-    second.clearState()
+  override def teardown(): Unit = {
+    first.teardown()
+    second.teardown()
   }
 
   override def description: String = {
@@ -61,22 +73,31 @@
   }
 }
 
-class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
   extends SingleInputFunction[IN, OUT] {
 
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
   override def process(value: IN): TraversableOnce[OUT] = {
     fn(value)
   }
 
-  override def description: String = descriptionMessage
+  override def teardown(): Unit = {
+    fn.teardown()
+  }
 }
 
-
-class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+class Reducer[T](fn: ReduceFunction[T], val description: String)
   extends SingleInputFunction[T, T] {
 
   private var state: Option[T] = None
 
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
   override def process(value: T): TraversableOnce[T] = {
     if (state.isEmpty) {
       state = Option(value)
@@ -90,23 +111,18 @@
     state
   }
 
-  override def clearState(): Unit = {
+  override def teardown(): Unit = {
     state = None
+    fn.teardown()
   }
-
-  override def description: String = descriptionMessage
 }
 
-class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
 
   override def process(value: T): TraversableOnce[Unit] = {
     emit(value)
     None
   }
 
-  override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
-    throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
-  }
-
   override def description: String = ""
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
similarity index 72%
rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 440a45e..430d795 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -16,14 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions._
 import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl._
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow}
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -36,55 +38,95 @@
     private val edge: Option[OpEdge] = None) {
 
   /**
-   * converts a value[T] to a list of value[R]
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
    *
-   * @param fn FlatMap function
+   * @param fn flatMap function
    * @param description The description message for this operation
    * @return A new stream with type [R]
    */
   def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
-    val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
-    graph.addVertex(flatMapOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
-    new Stream[R](graph, flatMapOp)
+    this.flatMap(FlatMapFunction(fn), description)
   }
 
   /**
-   * Maps message of type T message of type R
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
    *
-   * @param fn Function
+   * @param fn flatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+    transform(new FlatMapper[T, R](fn, description))
+  }
+
+  /**
+   * Returns a new stream by applying a map function to each element.
+   *
+   * @param fn map function
    * @return A new stream with type [R]
    */
   def map[R](fn: T => R, description: String = "map"): Stream[R] = {
-    this.flatMap({ data =>
-      Option(fn(data))
-    }, description)
+    this.map(MapFunction(fn), description)
   }
 
   /**
-   * Keeps records when fun(T) == true
+   * Returns a new stream by applying a map function to each element.
    *
-   * @param fn  the filter
-   * @return  a new stream after filter
+   * @param fn map function
+   * @return A new stream with type [R]
+   */
+  def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new Stream keeping the elements that satisfy the filter function.
+   *
+   * @param fn filter function
+   * @return a new stream after filter
    */
   def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
-    this.flatMap({ data =>
-      if (fn(data)) Option(data) else None
-    }, description)
+    this.filter(FilterFunction(fn), description)
   }
 
   /**
-   * Reduces operations.
+   * Returns a new Stream keeping the elements that satisfy the filter function.
    *
-   * @param fn  reduction function
+   * @param fn filter function
+   * @return a new stream after filter
+   */
+  def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
    * @param description description message for this operator
-   * @return a new stream after reduction
+   * @return a new stream after reduce
    */
   def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
-    val reduceOp = ChainableOp(new ReduceFunction(fn, description))
-    graph.addVertex(reduceOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
-    new Stream(graph, reduceOp)
+    reduce(ReduceFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
+   * @param description description message for this operator
+   * @return a new stream after reduce
+   */
+  def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+    transform(new Reducer[T](fn, description))
+  }
+
+  private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = {
+    val op = ChainableOp(fn)
+    graph.addVertex(op)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+    new Stream(graph, op)
   }
 
   /**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
similarity index 98%
rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index 8116146..d6eed2e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -16,11 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
 
 import java.time.Instant
 
 import akka.actor.ActorSystem
+import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.streaming.StreamApplication
@@ -28,7 +29,6 @@
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.Graph
-import org.apache.gearpump.Message
 
 import scala.language.implicitConversions
 
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..f10a3db
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
+
+import scala.collection.JavaConverters._
+
+object FlatMapFunction {
+
+  def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[R] = {
+        fn.apply(t).asScala
+      }
+
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+
+  def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+      override def apply(t: T): TraversableOnce[R] = {
+        fn(t)
+      }
+    }
+  }
+
+  def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[R] = {
+        Option(fn(t))
+      }
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+
+  def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = {
+    new FlatMapFunction[T, T] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[T] = {
+        if (fn(t)) {
+          Option(t)
+        } else {
+          None
+        }
+      }
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+}
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Scala version of FlatMapFunction returns a TraversableOnce.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): TraversableOnce[R]
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
similarity index 65%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
index f07ceff..ab88bf1 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
@@ -15,16 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
 
 /**
- * Filter function
- *
- * @param <T> Message of type T
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[FlatMapFunction]].
  */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
+abstract class SerializableFunction extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def teardown(): Unit = {}
+
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index e35f085..c13a4fb 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -23,9 +23,8 @@
 import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
-class TransformTask[IN, OUT](
-    operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
-    userConf: UserConfig) extends Task(taskContext, userConf) {
+class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
+    taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
   def this(taskContext: TaskContext, userConf: UserConfig) = {
     this(userConf.getValue[SingleInputFunction[IN, OUT]](
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index d87a9e4..223a4af 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -28,7 +28,7 @@
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
 import com.gs.collections.impl.set.mutable.UnifiedSet
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction}
 import org.apache.gearpump.streaming.dsl.window.api.Discarding
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.LogUtil
@@ -39,7 +39,6 @@
   def process(message: Message): Unit
 
   def trigger(time: Instant): Unit
-
 }
 
 object DefaultWindowRunner {
@@ -59,7 +58,6 @@
   private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
   private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
 
-
   override def process(message: Message): Unit = {
     val (group, buckets) = groupBy.groupBy(message)
     buckets.foreach { bucket =>
@@ -72,8 +70,11 @@
       inputs.add(message.msg.asInstanceOf[IN])
       windowGroups.put(wg, inputs)
     }
-    groupFns.putIfAbsent(group,
-      userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+    if (!groupFns.containsKey(group)) {
+      val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
+      fn.setup()
+      groupFns.put(group, fn)
+    }
   }
 
   override def trigger(time: Instant): Unit = {
@@ -88,8 +89,7 @@
           wgs.forEach(new Procedure[WindowGroup[GROUP]] {
             override def value(each: WindowGroup[GROUP]): Unit = {
               val inputs = windowGroups.remove(each)
-              val reduceFn = groupFns.get(each.group)
-                .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+              val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time)))
               inputs.forEach(new Procedure[IN] {
                 override def value(t: IN): Unit = {
                   // .toList forces eager evaluation
@@ -99,7 +99,7 @@
               // .toList forces eager evaluation
               reduceFn.finish().toList
               if (groupBy.window.accumulationMode == Discarding) {
-                reduceFn.clearState()
+                reduceFn.teardown()
               }
             }
           })
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index 98bf24f..f0920de 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,7 +25,8 @@
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
@@ -145,7 +146,6 @@
 
       val chainedOp = chainableOp1.chain(chainableOp2)
 
-      verify(fn1).andThen(fn2)
       chainedOp shouldBe a[ChainableOp[_, _]]
 
       unchainableOps.foreach { op =>
@@ -156,12 +156,9 @@
     }
 
     "get Processor" in {
-      val fn = new SingleInputFunction[Any, Any] {
-        override def process(value: Any): TraversableOnce[Any] = null
-
-        override def description: String = null
-      }
-      val chainableOp = ChainableOp[Any, Any](fn)
+      val fn = mock[FlatMapFunction[Any, Any]]
+      val flatMapper = new FlatMapper(fn, "flatMap")
+      val chainableOp = ChainableOp[Any, Any](flatMapper)
 
       val processor = chainableOp.getProcessor
       processor shouldBe a[Processor[_]]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 1610f0e..3f23fa9 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -23,10 +23,12 @@
 import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
 import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
@@ -56,8 +58,8 @@
     val graph = Graph.empty[Op, OpEdge]
     val sourceOp = DataSourceOp(new AnySource)
     val groupByOp = GroupByOp(new AnyGroupByFn)
-    val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction)
-    val reduceOp = ChainableOp[Any, Any](anyReduceFunction)
+    val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
+    val reduceOp = ChainableOp[Any, Any](anyReducer)
     val processorOp = new ProcessorOp[AnyTask]
     val sinkOp = DataSinkOp(new AnySink)
     val directEdge = Direct
@@ -92,9 +94,10 @@
 object PlannerSpec {
 
   private val anyParallelism = 1
-  private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap")
-  private val anyReduceFunction = new ReduceFunction[Any](
-    (left: Any, right: Any) => (left, right), "reduce")
+  private val anyFlatMapper = new FlatMapper[Any, Any](
+    FlatMapFunction(Option(_)), "flatMap")
+  private val anyReducer = new Reducer[Any](
+    ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
 
   class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
 
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
index ad12e33..2c03e1c 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -23,9 +23,11 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
 import org.apache.gearpump.streaming.dsl.window.api.CountWindow
 import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
@@ -77,149 +79,31 @@
       andThen.finish().toList shouldBe List(secondResult)
     }
 
-    "clear both states on clearState" in {
-      andThen.clearState()
+    "set up both functions on setup" in {
+      andThen.setup()
 
-      verify(first).clearState()
-      verify(second).clearState()
+      verify(first).setup()
+      verify(second).setup()
     }
 
-    "return AndThen on andThen" in {
-      val third = mock[SingleInputFunction[T, Any]]
-      when(second.andThen(third)).thenReturn(AndThen(second, third))
+    "tear down both functions on teardown" in {
+      andThen.teardown()
 
-      andThen.andThen[Any](third)
-
-      verify(first).andThen(AndThen(second, third))
-    }
-  }
-
-  "FlatMapFunction" should {
-
-    val flatMap = mock[R => TraversableOnce[S]]
-    val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
-
-    "call flatMap function when processing input value" in {
-      val input = mock[R]
-      flatMapFunction.process(input)
-      verify(flatMap).apply(input)
+      verify(first).teardown()
+      verify(second).teardown()
     }
 
-    "return passed in description" in {
-      flatMapFunction.description shouldBe "flatMap"
-    }
-
-    "return None on finish" in {
-      flatMapFunction.finish() shouldBe List.empty[S]
-    }
-
-    "do nothing on clearState" in {
-      flatMapFunction.clearState()
-      verifyZeroInteractions(flatMap)
-    }
-
-    "return AndThen on andThen" in {
-      val other = mock[SingleInputFunction[S, T]]
-      flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
-    }
-  }
-
-  "ReduceFunction" should {
-
-
-    "call reduce function when processing input value" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      val input1 = mock[T]
-      val input2 = mock[T]
-      val output = mock[T]
-
-      when(reduce.apply(input1, input2)).thenReturn(output, output)
-
-      reduceFunction.process(input1) shouldBe List.empty[T]
-      reduceFunction.process(input2) shouldBe List.empty[T]
-      reduceFunction.finish() shouldBe List(output)
-
-      reduceFunction.clearState()
-      reduceFunction.process(input1) shouldBe List.empty[T]
-      reduceFunction.clearState()
-      reduceFunction.process(input2) shouldBe List.empty[T]
-      reduceFunction.finish() shouldBe List(input2)
-    }
-
-    "return passed in description" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.description shouldBe "reduce"
-    }
-
-    "return None on finish" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.finish() shouldBe List.empty[T]
-    }
-
-    "do nothing on clearState" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.clearState()
-      verifyZeroInteractions(reduce)
-    }
-
-    "return AndThen on andThen" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      val other = mock[SingleInputFunction[T, Any]]
-      reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
-    }
-  }
-
-  "EmitFunction" should {
-
-    val emit = mock[T => Unit]
-    val emitFunction = new EmitFunction[T](emit)
-
-    "emit input value when processing input value" in {
-      val input = mock[T]
-
-      emitFunction.process(input) shouldBe List.empty[Unit]
-
-      verify(emit).apply(input)
-    }
-
-    "return empty description" in {
-      emitFunction.description shouldBe ""
-    }
-
-    "return None on finish" in {
-      emitFunction.finish() shouldBe List.empty[Unit]
-    }
-
-    "do nothing on clearState" in {
-      emitFunction.clearState()
-      verifyZeroInteractions(emit)
-    }
-
-    "throw exception on andThen" in {
-      val other = mock[SingleInputFunction[Unit, Any]]
-      intercept[UnsupportedOperationException] {
-        emitFunction.andThen(other)
-      }
-    }
-  }
-
-  "andThen" should {
     "chain multiple single input function" in {
-      val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
+      val split = new FlatMapper[String, String](FlatMapFunction(_.split("\\s")), "split")
 
-      val filter = new FlatMapFunction[String, String](word =>
-        if (word.isEmpty) None else Some(word), "filter")
+      val filter = new FlatMapper[String, String](
+        FlatMapFunction(word => if (word.isEmpty) None else Some(word)), "filter")
 
-      val map = new FlatMapFunction[String, Int](word => Some(1), "map")
+      val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map")
 
-      val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
+      val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum")
 
-      val all = split.andThen(filter).andThen(map).andThen(sum)
+      val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
 
       assert(all.description == "split.filter.map.sum")
 
@@ -239,6 +123,123 @@
     }
   }
 
+  "FlatMapper" should {
+
+    val flatMapFunction = mock[FlatMapFunction[R, S]]
+    val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap")
+
+    "call flatMap function when processing input value" in {
+      val input = mock[R]
+      flatMapper.process(input)
+      verify(flatMapFunction).apply(input)
+    }
+
+    "return passed in description" in {
+      flatMapper.description shouldBe "flatMap"
+    }
+
+    "return None on finish" in {
+      flatMapper.finish() shouldBe List.empty[S]
+    }
+
+    "set up FlatMapFunction on setup" in {
+      flatMapper.setup()
+
+      verify(flatMapFunction).setup()
+    }
+
+    "tear down FlatMapFunction on teardown" in {
+      flatMapper.teardown()
+
+      verify(flatMapFunction).teardown()
+    }
+  }
+
+  "ReduceFunction" should {
+
+    "call reduce function when processing input value" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      val input1 = mock[T]
+      val input2 = mock[T]
+      val output = mock[T]
+
+      when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
+
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(output)
+
+      reducer.teardown()
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.teardown()
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(input2)
+    }
+
+    "return passed in description" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.description shouldBe "reduce"
+    }
+
+    "return None on finish" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.finish() shouldBe List.empty[T]
+    }
+
+    "set up reduce function on setup" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.setup()
+
+      verify(reduceFunction).setup()
+    }
+
+    "tear down reduce function on teardown" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.teardown()
+
+      verify(reduceFunction).teardown()
+    }
+  }
+
+  "Emit" should {
+
+    val emitFunction = mock[T => Unit]
+    val emit = new Emit[T](emitFunction)
+
+    "emit input value when processing input value" in {
+      val input = mock[T]
+
+      emit.process(input) shouldBe List.empty[Unit]
+
+      verify(emitFunction).apply(input)
+    }
+
+    "return empty description" in {
+      emit.description shouldBe ""
+    }
+
+    "return None on finish" in {
+      emit.finish() shouldBe List.empty[Unit]
+    }
+
+    "do nothing on setup" in {
+      emit.setup()
+
+      verifyZeroInteractions(emitFunction)
+    }
+
+    "do nothing on teardown" in {
+      emit.teardown()
+
+      verifyZeroInteractions(emitFunction)
+    }
+  }
+
   "Source" should {
     "iterate over input source and apply attached operator" in {
 
@@ -261,7 +262,8 @@
 
       // Source with transformer
       val anotherTaskContext = MockUtil.mockTaskContext
-      val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
       val another = new DataSourceTask(anotherTaskContext,
         conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
       another.onStart(Instant.EPOCH)
@@ -279,9 +281,8 @@
 
       val data = "1 2  2  3 3  3"
 
-      val concat = new ReduceFunction[String]({ (left, right) =>
-        left + right
-      }, "concat")
+      val concat = new Reducer[String](ReduceFunction({ (left, right) =>
+        left + right}), "concat")
 
       implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
       val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](
@@ -315,7 +316,8 @@
       // Source with transformer
       val taskContext = MockUtil.mockTaskContext
       val conf = UserConfig.empty
-      val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
       val task = new TransformTask[String, String](Some(double), taskContext, conf)
       task.onStart(Instant.EPOCH)
 
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
similarity index 91%
rename from streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
rename to streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index db4db93..5b90a3e 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -16,14 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
 
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.dsl.scalaapi
 import org.apache.gearpump.streaming.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.util.Graph
 import org.mockito.Mockito.when
 import org.scalatest._
@@ -49,8 +50,8 @@
     when(context.system).thenReturn(system)
 
     val dsl = StreamApp("dsl", context)
-    dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
-    dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
+    dsl.source(List("A"), 2, "A") shouldBe a [scalaapi.Stream[_]]
+    dsl.source(List("B"), 3, "B") shouldBe a [scalaapi.Stream[_]]
 
     val application = dsl.plan()
     application shouldBe a [StreamApplication]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
similarity index 94%
rename from streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
rename to streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index 8def61e..62a3bcb 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -16,19 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
 
 import akka.actor._
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.dsl.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
 import org.mockito.Mockito.when
@@ -71,9 +71,10 @@
       map(word => (word, 1)).
       groupBy(_._1, parallelism = 2).
       reduce((left, right) => (left._1, left._2 + right._2)).
-      map[Either[(String, Int), String]](Left(_))
+      map[Either[(String, Int), String]]({t: (String, Int) => Left(t)})
 
-    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
+    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](
+      {s: String => Right(s)})
     stream.merge(query).process[(String, Int)](classOf[Join], 1)
 
     val app: StreamApplication = dsl.plan()