[GERAPUMP-22] Merge akka-streams branch into master

Author: manuzhang <owenzhang1990@gmail.com>
Author: Kam Kasravi <kamkasravi@yahoo.com>

Closes #137 from manuzhang/akka-streams.
diff --git a/experiments/akkastream/README.md b/experiments/akkastream/README.md
index 7c9a316..fe04554 100644
--- a/experiments/akkastream/README.md
+++ b/experiments/akkastream/README.md
@@ -1,4 +1,2 @@
 Akka Stream 
-=========
-
-TODO: This directory is obsolte. Working on updating it to Akka 2.4.3.
+=========
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf
index 626a1dc..8584511 100644
--- a/experiments/akkastream/src/main/resources/geardefault.conf
+++ b/experiments/akkastream/src/main/resources/geardefault.conf
@@ -2,4 +2,4 @@
   "akka.stream.gearpump.example.WikipediaApp$WikidataElement" = ""
   "scala.collection.immutable.Map$Map1" = ""
   "scala.collection.immutable.Map$Map2" = ""
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
deleted file mode 100644
index d2b328d..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream
-
-import scala.concurrent.ExecutionContextExecutor
-
-/**
- * [[BaseMaterializer]] is a extension to [[akka.stream.Materializer]].
- *
- * Compared with [[akka.stream.Materializer]], the difference is that
- * [[materialize]] accepts a [[ModuleGraph]] instead of a RunnableGraph.
- *
- * @see [[ModuleGraph]] for the difference between RunnableGraph and
- *      [[ModuleGraph]]
- *
- */
-abstract class BaseMaterializer extends akka.stream.Materializer {
-
-  override def withNamePrefix(name: String): Materializer = throw new UnsupportedOperationException()
-
-  override implicit def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException()
-
-  def materialize[Mat](graph: ModuleGraph[Mat]): Mat
-
-  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
-    val graph = ModuleGraph(runnableGraph)
-    materialize(graph)
-  }
-
-  def shutdown(): Unit
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
deleted file mode 100644
index 48d06f7..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This code is similar to MaterializerSession and will be deprecated in the next release.
- */
-
-package akka.stream
-
-import scala.collection.mutable
-
-import akka.stream.Attributes.Attribute
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.StreamLayout._
-import akka.stream.impl._
-import akka.stream.{Graph => AkkaGraph}
-
-import _root_.org.apache.gearpump.util
-import _root_.org.apache.gearpump.util.Graph
-
-/**
- *
- * ModuleGraph is a transformation on [[akka.stream.scaladsl.RunnableGraph]].
- * It carries all the information of [[akka.stream.scaladsl.RunnableGraph]], but
- * represents it in a different way.
- *
- * Here is the difference:
- *
- * RunnableGraph
- * ==============================
- * [[akka.stream.scaladsl.RunnableGraph]] is represented as a [[Module]] tree:
- *             TopLevelModule
- *                   |
- *          -------------------
- *          |                 |
- *      SubModule1         SubModule2
- *          |                    |
- *   ----------------           ----------
- *   |              |                    |
- * AtomicModule1 AtomicModule2  AtomicModule3
- *
- * ModuleGraph
- * ==============================
- * [[ModuleGraph]] is represented as a [[util.Graph]] of Atomic [[Module]]:
- *
- *        AtomicModule2 -> AtomicModule4
- *         /|                    \
- *       /                        \
- *     /                           \|
- * AtomicModule1           AtomicModule5
- *     \                   /|
- *      \                 /
- *       \|              /
- *         AtomicModule3
- *
- * Each vertex in the Graph is a [[Module]], each [[Edge]] in the Graph is a tuple
- * ([[OutPort]], [[InPort]]). [[OutPort]] is one of upstream Atomic Module
- * output ports. [[InPort]] is one of downstream Atomic Module input ports.
- *
- *
- * Why use [[ModuleGraph]] instead of [[akka.stream.scaladsl.RunnableGraph]]?
- * =========================
- * There are several good reasons:):
- * 1. [[ModuleGraph]] outlines explicitly the upstream/downstream relation.
- *   Each [[Edge]] of [[ModuleGraph]] represent a upstream/downstream pair.
- *   It is easier for user to understand the overall data flow.
- *
- * 2. It is easier for performance optimization.
- *  For the above Graph, if we want to fuse AtomicModule2 and AtomicModule3
- *  together, it can be done within [[ModuleGraph]]. We only need
- *  to substitute Pair(AtomicModule2, AtomicModule4) with a unified Module.
- *
- * 3. It avoids module duplication.
- *  In [[akka.stream.scaladsl.RunnableGraph]], composite Module can be re-used.
- *  It is possible that there can be duplicate Modules.
- *  The duplication problem causes big headache when doing materialization.
- *
- *  [[ModuleGraph]] doesn't have thjis problem. [[ModuleGraph]] does a transformation on the Module
- *  Tree to make sure each Atomic Module [[ModuleGraph]] is unique.
- *
- *
- * @param graph a Graph of Atomic modules.
- * @param mat is a function of:
- *             input => materialized value of each Atomic module
- *             output => final materialized value.
- * @tparam Mat
- */
-class ModuleGraph[Mat](val graph: util.Graph[Module, Edge], val mat: MaterializedValueNode) {
-
-  def resolve(materializedValues: Map[Module, Any]): Mat = {
-    MaterializedValueOps(mat).resolve[Mat](materializedValues)
-  }
-}
-
-object ModuleGraph {
-
-  def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] = {
-    val topLevel = runnableGraph.module
-    val factory = new ModuleGraphFactory(topLevel)
-    val (graph, mat) = factory.create()
-    new ModuleGraph(graph, mat)
-  }
-
-  /**
-   *
-   * @param from outport of upstream module
-   * @param to inport of downstream module
-   */
-  case class Edge(from: OutPort, to: InPort)
-
-  private class ModuleGraphFactory(val topLevel: StreamLayout.Module) {
-
-    private var subscribersStack: List[mutable.Map[InPort, (InPort, Module)]] =
-      mutable.Map.empty[InPort, (InPort, Module)].withDefaultValue(null) :: Nil
-    private var publishersStack: List[mutable.Map[OutPort, (OutPort, Module)]] =
-      mutable.Map.empty[OutPort, (OutPort, Module)].withDefaultValue(null) :: Nil
-
-    /*
-     * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule
-     * itself. The reason is that the CopiedModule itself is only needed for the enterScope and exitScope methods but
-     * not elsewhere. For this reason they are just simply passed as parameters to those methods.
-     *
-     * The reason why the encapsulated (copied) modules are stored as mutable state to save subclasses of this class
-     * from passing the current scope around or even knowing about it.
-     */
-    private var moduleStack: List[Module] = topLevel :: Nil
-
-    private def subscribers: mutable.Map[InPort, (InPort, Module)] = subscribersStack.head
-    private def publishers: mutable.Map[OutPort, (OutPort, Module)] = publishersStack.head
-    private def currentLayout: Module = moduleStack.head
-
-    private val graph = Graph.empty[Module, Edge]
-
-    private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = {
-      val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
-      module.withAttributes(currentAttributes).asInstanceOf[T]
-    }
-
-    private def materializeAtomic(atomic: Module, parentAttributes: Attributes): MaterializedValueNode = {
-      val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
-      val copied = copyAtomicModule(atomic, parentAttributes)
-
-      for ((in, id) <- inputs.zipWithIndex) {
-        val inPort = inPortMapping(atomic, copied)(in)
-        assignPort(in, (inPort, copied))
-      }
-
-      for ((out, id) <- outputs.zipWithIndex) {
-        val outPort = outPortMapping(atomic, copied)(out)
-        assignPort(out, (outPort, copied))
-      }
-
-      graph.addVertex(copied)
-      Atomic(copied)
-    }
-
-    def create(): (util.Graph[Module, Edge], MaterializedValueNode) = {
-      val mat = materializeModule(topLevel, Attributes.none)
-      (graph, mat)
-    }
-
-    private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = {
-      from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
-    }
-
-    private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
-      from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
-    }
-
-    private def materializeModule(module: Module, parentAttributes: Attributes): MaterializedValueNode = {
-
-      val materializedValues = collection.mutable.HashMap.empty[Module, MaterializedValueNode]
-      val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
-
-      var materializedValueSources = List.empty[MaterializedValueSource[_]]
-
-      for (submodule <- module.subModules) {
-        submodule match {
-          case mv: MaterializedValueSource[_] =>
-            materializedValueSources :+= mv
-          case atomic if atomic.isAtomic =>
-            materializedValues.put(atomic, materializeAtomic(atomic, currentAttributes))
-          case copied: CopiedModule =>
-            enterScope(copied)
-            materializedValues.put(copied, materializeModule(copied, currentAttributes))
-            exitScope(copied)
-          case composite =>
-            materializedValues.put(composite, materializeComposite(composite, currentAttributes))
-        }
-      }
-
-      val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
-
-      materializedValueSources.foreach { module =>
-        val matAttribute = new MaterializedValueSourceAttribute(mat)
-        val copied = copyAtomicModule(module, parentAttributes and Attributes(matAttribute))
-        assignPort(module.shape.outlet, (copied.shape.outlet, copied))
-        graph.addVertex(copied)
-        materializedValues.put(copied, Atomic(copied))
-      }
-      mat
-    }
-
-    private def materializeComposite(composite: Module, effectiveAttributes: Attributes): MaterializedValueNode = {
-      materializeModule(composite, effectiveAttributes)
-    }
-
-    private def mergeAttributes(parent: Attributes, current: Attributes): Attributes = {
-      parent and current
-    }
-
-    private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, MaterializedValueNode]): MaterializedValueNode = matNode match {
-      case Atomic(m) => materializedValues(m)
-      case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
-      case Transform(f, d) => Transform(f, resolveMaterialized(d, materializedValues))
-      case Ignore => Ignore
-    }
-
-    final protected def assignPort(in: InPort, subscriber: (InPort, Module)): Unit = {
-      addVertex(subscriber._2)
-      subscribers(in) = subscriber
-      // Interface (unconnected) ports of the current scope will be wired when exiting the scope
-      if (!currentLayout.inPorts(in)) {
-        val out = currentLayout.upstreams(in)
-        val publisher = publishers(out)
-        if (publisher ne null) addEdge(publisher, subscriber)
-      }
-    }
-
-    final protected def assignPort(out: OutPort, publisher: (OutPort, Module)): Unit = {
-      addVertex(publisher._2)
-      publishers(out) = publisher
-      // Interface (unconnected) ports of the current scope will be wired when exiting the scope
-      if (!currentLayout.outPorts(out)) {
-        val in = currentLayout.downstreams(out)
-        val subscriber = subscribers(in)
-        if (subscriber ne null) addEdge(publisher, subscriber)
-      }
-    }
-
-    // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
-    // of the same module.
-    // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
-    private def enterScope(enclosing: CopiedModule): Unit = {
-      subscribersStack ::= mutable.Map.empty.withDefaultValue(null)
-      publishersStack ::= mutable.Map.empty.withDefaultValue(null)
-      moduleStack ::= enclosing.copyOf
-    }
-
-    // Exits the scope of the copied module and propagates Publishers/Subscribers to the enclosing scope assigning
-    // them to the copied ports instead of the original ones (since there might be multiple copies of the same module
-    // leading to port identity collisions)
-    // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
-    private def exitScope(enclosing: CopiedModule): Unit = {
-      val scopeSubscribers = subscribers
-      val scopePublishers = publishers
-      subscribersStack = subscribersStack.tail
-      publishersStack = publishersStack.tail
-      moduleStack = moduleStack.tail
-
-      // When we exit the scope of a copied module,  pick up the Subscribers/Publishers belonging to exposed ports of
-      // the original module and assign them to the copy ports in the outer scope that we will return to
-      enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach {
-        case (original, exposed) => assignPort(exposed, scopeSubscribers(original))
-      }
-
-      enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach {
-        case (original, exposed) => assignPort(exposed, scopePublishers(original))
-      }
-    }
-
-    private def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = {
-      graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2)
-    }
-
-    private def addVertex(module: Module): Unit = {
-      graph.addVertex(module)
-    }
-  }
-
-  final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
deleted file mode 100644
index a11d7cb..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump
-
-import akka.actor.ActorSystem
-import akka.stream._
-import akka.stream.gearpump.graph.GraphCutter.Strategy
-import akka.stream.gearpump.graph.LocalGraph.LocalGraphMaterializer
-import akka.stream.gearpump.graph.RemoteGraph.RemoteGraphMaterializer
-import akka.stream.gearpump.graph.{GraphCutter, LocalGraph, RemoteGraph, SubGraphMaterializer}
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
- * streaming application. If some module cannot be rendered remotely in Gearpump
- * Cluster, then it uses local Actor materializer as fallback to materialize
- * the module locally.
- *
- * User can custom a [[Strategy]] to determinie which module should be rendered
- * remotely, and which module should be rendered locally.
- *
- * @see [[GraphCutter]] to find out how we cut the [[ModuleGraph]] to two parts,
- *      and materialize them seperately.
- *
- * @param system
- * @param strategy
- * @param useLocalCluster whether to use built-in in-process local cluster
- */
-class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = GraphCutter.AllRemoteStrategy,
-    useLocalCluster: Boolean = true)
-  extends BaseMaterializer {
-
-  private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
-    classOf[LocalGraph] -> new LocalGraphMaterializer(system),
-    classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system)
-  )
-
-  override def materialize[Mat](graph: ModuleGraph[Mat]): Mat = {
-    val subGraphs = new GraphCutter(strategy).cut(graph)
-    val matValues = subGraphs.foldLeft(Map.empty[Module, Any]) { (map, subGraph) =>
-      val materializer = subMaterializers(subGraph.getClass)
-      map ++ materializer.materialize(subGraph, map)
-    }
-    graph.resolve(matValues)
-  }
-
-  override def shutdown(): Unit = {
-    subMaterializers.values.foreach(_.shutdown())
-  }
-}
-
-object GearpumpMaterializer {
-  def apply(system: ActorSystem): GearpumpMaterializer = new GearpumpMaterializer(system)
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
deleted file mode 100644
index 7808b52..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.scaladsl.{Sink, Source}
-
-/**
- * This tests how the [[GearpumpMaterializer]] materializes different partials of Graph
- * to different runtime.
- *
- * In this test, source module and sink module are materialized locally,
- * Other transformation module are materialized remotely in Gearpump
- * streaming Application.
- *
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- *
- *
- */
-object Test {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-    val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant",
-      "blue sky"))
-    source.filter(_.startsWith("red")).fold("Items:") { (a, b) =>
-      a + "|" + b
-    }.map("I want to order item: " + _).runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        // scalastyle:off println
-        println("Confirm received: " + any)
-      // scalastyle:on println
-    }
-  }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
deleted file mode 100644
index 2426f5f..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.ActorMaterializer
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.{GearSink, GearSource}
-import akka.stream.scaladsl.{Flow, Sink, Source}
-
-/**
- *
- * This tests how different Materializers can be used together in an explicit way.
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- *
- */
-object Test2 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test2...")
-    implicit val system = ActorSystem("akka-test")
-    val materializer = new GearpumpMaterializer(system)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val source = GearSource.bridge[String, String]
-    val sink = GearSink.bridge[String, String]
-
-    val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
-    val (entry, exit) = flow.runWith(source, sink)(materializer)
-
-    val actorMaterializer = ActorMaterializer()
-
-    val externalSource = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
-    val externalSink = Sink.actorRef(echo, "COMPLETE")
-
-    val graph = FlowGraph.closed() { implicit b =>
-      externalSource ~> Sink(entry)
-      Source(exit) ~> externalSink
-    }
-    graph.run()(actorMaterializer)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
deleted file mode 100644
index 976b1e6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSource
-import akka.stream.scaladsl.Sink
-
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-
-/**
- * read from remote and write to local
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- */
-object Test3 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-    val sourceData = new CollectionDataSource(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
-    val source = GearSource.from[String](sourceData)
-    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
deleted file mode 100644
index 7b80b7b..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSink
-import akka.stream.scaladsl.Source
-
-import org.apache.gearpump.streaming.dsl.LoggerSink
-
-/**
- * read from local and write to remote
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- */
-object Test4 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system)
-
-    val sink = GearSink.to(new LoggerSink[String])
-    val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
-    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
deleted file mode 100644
index 052c018..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.scaladsl.{Sink, Source, Unzip}
-
-/**
-test fanout
-  */
-object Test5 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-
-    val source = Source(List(("male", "24"), ("female", "23")))
-
-    val graph = FlowGraph.closed() { implicit b =>
-      val unzip = b.add(Unzip[String, String]())
-
-      val sink1 = Sink.actorRef(echo, "COMPLETE")
-      val sink2 = Sink.actorRef(echo, "COMPLETE")
-
-      source ~> unzip.in
-      unzip.out0 ~> sink1
-      unzip.out1 ~> sink1
-    }
-
-    graph.run()
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
deleted file mode 100644
index 0fccd30..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSource
-import akka.stream.scaladsl.Sink
-
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-
-/**
- * WordCount example
- * Test GroupBy
- */
-
-import akka.stream.gearpump.scaladsl.Implicits._
-
-object Test6 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-    val sourceData = new CollectionDataSource(List("this is a good start", "this is a good time", "time to start", "congratulations", "green plant", "blue sky"))
-    val source = GearSource.from[String](sourceData)
-    source.mapConcat { line =>
-      line.split(" ").toList
-    }.groupBy2(x => x).map(word => (word, 1))
-      .reduce({ (a, b) =>
-        (a._1, a._2 + b._2)
-      }).log("word-count").runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
deleted file mode 100644
index 6ef8598..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.graph
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.materializer.LocalMaterializer
-import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule}
-import akka.stream.impl.Stages.DefaultAttributes
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{PublisherSource, SubscriberSink}
-import akka.stream.{Outlet, SinkShape, SourceShape}
-import org.reactivestreams.{Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- *
- * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- * contain module that can be materialized in local JVM.
- *
- * @param graph
- */
-class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
-
-object LocalGraph {
-
-  /**
-   * materialize LocalGraph in local JVM
-   * @param system
-   */
-  class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
-
-    // Creates a local materializer
-    val materializer = LocalMaterializer()(system)
-
-    /**
-     *
-     * @param matValues Materialized Values for each module before materialization
-     * @return Materialized Values for each Module after the materialization.
-     */
-    override def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] = {
-      val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { module =>
-        module match {
-          case source: SourceBridgeModule[AnyRef, AnyRef] =>
-            val subscriber = matValues(source).asInstanceOf[Subscriber[AnyRef]]
-            val shape = SinkShape(source.inPort)
-            new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
-          case sink: SinkBridgeModule[AnyRef, AnyRef] =>
-            val publisher = matValues(sink).asInstanceOf[Publisher[AnyRef]]
-            val shape = SourceShape(sink.outPort.asInstanceOf[Outlet[AnyRef]])
-            new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
-          case other =>
-            other
-        }
-      }
-      materializer.materialize(newGraph, matValues)
-    }
-
-    override def shutdown(): Unit = {
-      materializer.shutdown()
-    }
-  }
-}
-
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
deleted file mode 100644
index a5c6e48..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import scala.concurrent.{Await, ExecutionContextExecutor}
-
-import akka.actor.{ActorCell, ActorRef, ActorSystem, Deploy, LocalActorRef, PoisonPill, Props, RepointableActorRef}
-import akka.dispatch.Dispatchers
-import akka.pattern.ask
-import akka.stream.ModuleGraph.Edge
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.StreamSupervisor
-import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, MaterializationContext, ModuleGraph}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * [[LocalMaterializer]] will use local actor to materialize the graph
- * Use LocalMaterializer.apply to construct the LocalMaterializer.
- *
- * It is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
- *
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-abstract class LocalMaterializer(
-    val system: ActorSystem,
-    override val settings: ActorMaterializerSettings,
-    dispatchers: Dispatchers,
-    val supervisor: ActorRef,
-    val haveShutDown: AtomicBoolean,
-    flowNameCounter: AtomicLong,
-    namePrefix: String,
-    optimizations: Optimizations) extends ActorMaterializer {
-
-  override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
-    import ActorAttributes._
-    import Attributes._
-    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
-      attr match {
-        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
-        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
-        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
-        case l: LogLevels => s
-        case Name(_) => s
-        case other => s
-      }
-    }
-  }
-
-  override def shutdown(): Unit =
-    if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
-
-  override def isShutdown: Boolean = haveShutDown.get()
-
-  private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = {
-    supervisor match {
-      case ref: LocalActorRef =>
-        ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false)
-      case ref: RepointableActorRef =>
-        if (ref.isStarted) {
-          ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher),
-            name, systemService = false)
-        } else {
-          implicit val timeout = ref.system.settings.CreationTimeout
-          val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher),
-            name)).mapTo[ActorRef]
-          Await.result(f, timeout.duration)
-        }
-      case unknown =>
-        throw new IllegalStateException(
-          s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]")
-    }
-  }
-
-  override lazy val executionContext: ExecutionContextExecutor =
-    dispatchers.lookup(settings.dispatcher match {
-      case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
-      case other => other
-    })
-
-  def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any]
-
-  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = {
-    val graph = ModuleGraph(runnableGraph)
-    val matValues = materialize(graph.graph, Map.empty[Module, Any])
-    graph.resolve(matValues)
-  }
-
-  override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
-    val dispatcher =
-      if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) {
-        effectiveSettings(context.effectiveAttributes).dispatcher
-      } else {
-        props.dispatcher
-      }
-    actorOf(props, context.stageName, dispatcher)
-  }
-}
-
-object LocalMaterializer {
-
-  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
-      namePrefix: Option[String] = None,
-      optimizations: Optimizations = Optimizations.none)(implicit system: ActorSystem)
-    : LocalMaterializerImpl = {
-
-    val settings = materializerSettings getOrElse ActorMaterializerSettings(system)
-    apply(settings, namePrefix.getOrElse("flow"), optimizations)(system)
-  }
-
-  def apply(materializerSettings: ActorMaterializerSettings,
-      namePrefix: String, optimizations: Optimizations)(implicit system: ActorSystem)
-    : LocalMaterializerImpl = {
-    val haveShutDown = new AtomicBoolean(false)
-
-    new LocalMaterializerImpl(
-      system,
-      materializerSettings,
-      system.dispatchers,
-      system.actorOf(StreamSupervisor.props(materializerSettings,
-        haveShutDown).withDispatcher(materializerSettings.dispatcher)),
-      haveShutDown,
-      FlowNameCounter(system).counter,
-      namePrefix,
-      optimizations)
-  }
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
deleted file mode 100644
index 1ec724e..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Dispatchers
-import akka.stream.ModuleGraph.{Edge, MaterializedValueSourceAttribute}
-import akka.stream.actor.ActorSubscriber
-import akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule
-import akka.stream.gearpump.module.ReduceModule
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule}
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor}
-import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape}
-import org.reactivestreams.{Processor, Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-class LocalMaterializerImpl (
-    system: ActorSystem,
-    settings: ActorMaterializerSettings,
-    dispatchers: Dispatchers,
-    supervisor: ActorRef,
-    haveShutDown: AtomicBoolean,
-    flowNameCounter: AtomicLong,
-    namePrefix: String,
-    optimizations: Optimizations)
-  extends LocalMaterializer(
-    system, settings, dispatchers, supervisor,
-    haveShutDown, flowNameCounter, namePrefix, optimizations) {
-
-  override def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = {
-    val materializedGraph = graph.mapVertex { module =>
-      materializeAtomic(module)
-    }
-
-    materializedGraph.edges.foreach { nodeEdgeNode =>
-      val (node1, edge, node2) = nodeEdgeNode
-      val from = edge.from
-      val to = edge.to
-      val publisher = node1.outputs(from).asInstanceOf[Publisher[Any]]
-      val subscriber = node2.inputs(to).asInstanceOf[Subscriber[Any]]
-      publisher.subscribe(subscriber)
-    }
-
-    val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex =>
-      (vertex.module, vertex.matValue)
-    }.toMap
-
-    val matValueSources = materializedGraph.vertices.filter(_.module.isInstanceOf[MaterializedValueSource[_]])
-    publishToMaterializedValueSource(matValueSources, matValues)
-
-    matValues
-  }
-
-  private def publishToMaterializedValueSource(modules: List[MaterializedModule], matValues: Map[Module, Any]) = {
-    modules.foreach { module =>
-      val source = module.module.asInstanceOf[MaterializedValueSource[_]]
-      val attr = source.attributes.getAttribute(classOf[MaterializedValueSourceAttribute], null)
-
-      Option(attr).map { attr =>
-        val valueToPublish = MaterializedValueOps(attr.mat).resolve[Any](matValues)
-        module.outputs.foreach { portAndPublisher =>
-          val (port, publisher) = portAndPublisher
-          publisher match {
-            case valuePublisher: MaterializedValuePublisher =>
-              valuePublisher.setValue(valueToPublish)
-          }
-        }
-      }
-    }
-  }
-
-  private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
-
-  private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
-
-  val flowName = createFlowName()
-  var nextId = 0
-
-  def stageName(attr: Attributes): String = {
-    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
-    nextId += 1
-    name
-  }
-
-  private def materializeAtomic(atomic: Module): MaterializedModule = {
-    val effectiveAttributes = atomic.attributes
-
-    def newMaterializationContext() =
-      new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))
-
-    atomic match {
-      case matValue: MaterializedValueSource[_] =>
-        val pub = new MaterializedValuePublisher
-        val outputs = Map[OutPort, Publisher[_]](matValue.shape.outlet -> pub)
-        MaterializedModule(matValue, (), outputs = outputs)
-      case sink: SinkModule[_, _] =>
-        val (sub, mat) = sink.create(newMaterializationContext())
-        val inputs = Map[InPort, Subscriber[_]](sink.shape.inlet -> sub)
-        MaterializedModule(sink, mat, inputs)
-      case source: SourceModule[_, _] =>
-        val (pub, mat) = source.create(newMaterializationContext())
-        val outputs = Map[OutPort, Publisher[_]](source.shape.outlet -> pub)
-        MaterializedModule(source, mat, outputs = outputs)
-
-      case reduce: ReduceModule[Any] =>
-        //TODO: remove this after the official akka-stream API support the Reduce Module
-        val stage = LocalMaterializerImpl.toFoldModule(reduce)
-        val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
-        val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
-        val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
-        MaterializedModule(stage, mat, inputs, outputs)
-
-      case stage: StageModule =>
-        val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
-        val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
-        val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
-        MaterializedModule(stage, mat, inputs, outputs)
-      case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here
-        val es = effectiveSettings(effectiveAttributes)
-        val props =
-          SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing)
-        val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
-        def factory(id: Int) = new ActorPublisher[Any](impl) {
-          override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
-        }
-        val publishers = Vector.tabulate(2)(factory)
-        impl ! FanOut.ExposedPublishers(publishers)
-
-        val inputs = Map[InPort, Subscriber[_]](
-          tls.plainIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn),
-          tls.cipherIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn))
-
-        val outputs = Map[OutPort, Publisher[_]](
-          tls.plainOut -> publishers(SslTlsCipherActor.UserOut),
-          tls.cipherOut -> publishers(SslTlsCipherActor.TransportOut))
-        MaterializedModule(tls, (), inputs, outputs)
-
-      case junction: JunctionModule =>
-        materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes))
-    }
-  }
-
-  private def processorFor(op: StageModule,
-    effectiveAttributes: Attributes,
-    effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match {
-    case DirectProcessor(processorFactory, _) => processorFactory()
-    case Identity(attr) => (new VirtualProcessor, ())
-    case _ =>
-      val (opprops, mat) = ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes)
-      ActorProcessorFactory[Any, Any](
-            actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat
-  }
-
-  private def materializeJunction(
-    op: JunctionModule,
-    effectiveAttributes: Attributes,
-    effectiveSettings: ActorMaterializerSettings): MaterializedModule = {
-    op match {
-      case fanin: FanInModule =>
-        val (props, inputs, output) = fanin match {
-
-          case MergeModule(shape, _) =>
-            (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out)
-
-          case f: FlexiMergeModule[_, Shape] =>
-            val flexi = f.flexi(f.shape)
-            val shape: Shape = f.shape
-            (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head)
-
-          case MergePreferredModule(shape, _) =>
-            (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out)
-
-          case ConcatModule(shape, _) =>
-            require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO
-            (Concat.props(effectiveSettings), shape.inSeq, shape.out)
-
-          case zip: ZipWithModule =>
-            (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
-        }
-
-        val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
-        val publisher = new ActorPublisher[Any](impl)
-        // Resolve cyclic dependency with actor. This MUST be the first message no matter what.
-        impl ! ExposedPublisher(publisher)
-
-        val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map { pair =>
-          val (in, id) = pair
-          (in, FanIn.SubInput[Any](impl, id))
-        }.toMap
-
-        val outMapping = Map(output -> publisher)
-        MaterializedModule(fanin, (), inputMapping, outMapping)
-
-      case fanout: FanOutModule =>
-        val (props, in, outs) = fanout match {
-
-          case r: FlexiRouteModule[t, Shape] =>
-            val flexi = r.flexi(r.shape)
-            val shape: Shape = r.shape
-            (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets)
-
-          case BroadcastModule(shape, eagerCancel, _) =>
-            (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq)
-
-          case BalanceModule(shape, waitForDownstreams, _) =>
-            (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
-
-          case unzip: UnzipWithModule =>
-            (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets)
-        }
-        val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
-        val size = outs.size
-        def factory(id: Int) =
-          new ActorPublisher[Any](impl) {
-            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
-          }
-        val publishers =
-          if (outs.size < 8) Vector.tabulate(size)(factory)
-          else List.tabulate(size)(factory)
-
-        impl ! FanOut.ExposedPublishers(publishers)
-        val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) =>
-          (out, pub)
-        }.toMap
-
-        val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl))
-        MaterializedModule(fanout, (), inputs, outputs)
-    }
-  }
-
-  override def withNamePrefix(name: String): Materializer = {
-    new LocalMaterializerImpl(system, settings, dispatchers, supervisor,
-      haveShutDown, flowNameCounter, namePrefix = name, optimizations)
-  }
-}
-
-object LocalMaterializerImpl {
-  case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
-
-  def toFoldModule(reduce: ReduceModule[Any]): Fold = {
-    val f = reduce.f
-    val aggregator = { (zero: Any, input: Any) =>
-      if (zero == null) {
-        input
-      } else {
-        f(zero, input)
-      }
-    }
-    new Fold(null, aggregator)
-  }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
deleted file mode 100644
index 47ed1f2..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
-import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, SinkBridgeTask, SourceBridgeTask, UnZip2Task}
-import akka.stream.impl.Stages
-import akka.stream.impl.Stages.StageModule
-import akka.stream.impl.StreamLayout.Module
-import org.slf4j.LoggerFactory
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp}
-import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-/**
- * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump
- * Streaming Application.
- *
- * @param graph
- * @param system
- */
-class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
-
-  import RemoteMaterializerImpl._
-
-  type Clue = String
-  private implicit val actorSystem = system
-
-  private def uuid: String = {
-    java.util.UUID.randomUUID.toString
-  }
-
-  /**
-   * @return a mapping from Module to Materialized Processor Id.
-   */
-  def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
-    val (opGraph, clues) = toOpGraph()
-    val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph)
-    val processorIds = resolveClues(app, clues)
-
-    val updatedApp = updateJunctionConfig(processorIds, app)
-    (cleanClues(updatedApp), processorIds)
-  }
-
-  private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], app: StreamApplication): StreamApplication = {
-    val config = junctionConfig(processorIds)
-
-    val dag = app.dag.mapVertex { vertex =>
-      val processorId = vertex.id
-      val newConf = vertex.taskConf.withConfig(config(processorId))
-      vertex.copy(taskConf = newConf)
-    }
-    new StreamApplication(app.name, app.inputUserConfig, dag)
-  }
-
-  /**
-   * Update junction config so that each GraphTask know its upstream and downstream.
-   * @param processorIds
-   * @return
-   */
-  private def junctionConfig(processorIds: Map[Module, ProcessorId]): Map[ProcessorId, UserConfig] = {
-    val updatedConfigs = graph.vertices.map { vertex =>
-      val processorId = processorIds(vertex)
-      vertex match {
-        case junction: JunctionModule =>
-          val inProcessors = junction.shape.inlets.map { inlet =>
-            val upstreamModule = graph.incomingEdgesOf(junction).find(_._2.to == inlet).map(_._1)
-            val upstreamProcessorId = processorIds(upstreamModule.get)
-            upstreamProcessorId
-          }.toList
-
-          val outProcessors = junction.shape.outlets.map { outlet =>
-            val downstreamModule = graph.outgoingEdgesOf(junction).find(_._2.from == outlet).map(_._3)
-            val downstreamProcessorId = downstreamModule.map(processorIds(_))
-            downstreamProcessorId.get
-          }.toList
-
-          (processorId, UserConfig.empty.withValue(GraphTask.OUT_PROCESSORS, outProcessors)
-            .withValue(GraphTask.IN_PROCESSORS, inProcessors))
-        case _ =>
-          (processorId, UserConfig.empty)
-      }
-    }.toMap
-    updatedConfigs
-  }
-
-  private def resolveClues(app: StreamApplication, clues: Map[Module, Clue]): Map[Module, ProcessorId] = {
-    clues.flatMap { kv =>
-      val (module, clue) = kv
-      val processorId = app.dag.vertices.find { processor =>
-        processor.taskConf.getString(clue).isDefined
-      }.map(_.id)
-      processorId.map((module, _))
-    }
-  }
-
-  private def cleanClues(app: StreamApplication): StreamApplication = {
-    val graph = app.dag.mapVertex { processor =>
-      val conf = cleanClue(processor.taskConf)
-      processor.copy(taskConf = conf)
-    }
-    new StreamApplication(app.name, app.inputUserConfig, graph)
-  }
-
-  private def cleanClue(conf: UserConfig): UserConfig = {
-    conf.filter { kv =>
-      kv._2 != RemoteMaterializerImpl.STAINS
-    }
-  }
-
-  private def toOpGraph(): (Graph[Op, OpEdge], Map[Module, Clue]) = {
-    var matValues = Map.empty[Module, Clue]
-    val opGraph = graph.mapVertex{ module =>
-      val name = uuid
-      val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.STAINS)
-      matValues += module -> name
-      val parallelism = GearAttributes.count(module.attributes)
-      val op = module match {
-        case source: SourceTaskModule[t] =>
-          val updatedConf = conf.withConfig(source.conf)
-          new DataSourceOp[t](source.source, parallelism, updatedConf, "source")
-        case sink: SinkTaskModule[t] =>
-          val updatedConf = conf.withConfig(sink.conf)
-          new DataSinkOp[t](sink.sink, parallelism, updatedConf, "sink")
-        case sourceBridge: SourceBridgeModule[_, _] =>
-          new ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
-        case processor: ProcessorModule[_, _, _] =>
-          val updatedConf = conf.withConfig(processor.conf)
-          new ProcessorOp(processor.processor, parallelism, updatedConf, "source")
-        case sinkBridge: SinkBridgeModule[_, _] =>
-          new ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
-        case groupBy: GroupByModule[t, g] =>
-          new GroupByOp[t, g](groupBy.groupBy, parallelism, "groupBy", conf)
-        case reduce: ReduceModule[Any] =>
-          reduceOp(reduce.f, conf)
-        case stage: StageModule =>
-          translateStage(stage, conf)
-        case fanIn: FanInModule =>
-          translateFanIn(fanIn, graph.incomingEdgesOf(fanIn), parallelism, conf)
-        case fanOut: FanOutModule =>
-          translateFanOut(fanOut, graph.outgoingEdgesOf(fanOut), parallelism, conf)
-      }
-
-      if (op == null) {
-        throw new UnsupportedOperationException(module.getClass.toString + " is not supported with RemoteMaterializer")
-      }
-      op
-    }.mapEdge[OpEdge]{(n1, edge, n2) =>
-      n2 match {
-        case master: MasterOp =>
-          Shuffle
-        case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
-          Shuffle
-        case slave: SlaveOp[_] =>
-          Direct
-      }
-    }
-    (opGraph, matValues)
-  }
-
-  private def translateStage(module: StageModule, conf: UserConfig): Op = {
-    module match {
-      case buffer: Stages.Buffer =>
-        //ignore the buffering operation
-        identity("buffer", conf)
-      case collect: Stages.Collect =>
-        collectOp(collect.pf, conf)
-      case concatAll: Stages.ConcatAll =>
-        //TODO:
-        null
-      case conflat: Stages.Conflate =>
-        conflatOp(conflat.seed, conflat.aggregate, conf)
-      case drop: Stages.Drop =>
-        dropOp(drop.n, conf)
-      case dropWhile: Stages.DropWhile =>
-        dropWhileOp(dropWhile.p, conf)
-      case expand: Stages.Expand =>
-        //TODO
-        null
-      case filter: Stages.Filter =>
-        filterOp(filter.p, conf)
-      case fold: Stages.Fold =>
-        foldOp(fold.zero, fold.f, conf)
-      case groupBy: Stages.GroupBy =>
-        //TODO
-        null
-      case grouped: Stages.Grouped =>
-        groupedOp(grouped.n, conf)
-      case _: Stages.Identity =>
-        identity("identity", conf)
-      case log: Stages.Log =>
-        logOp(log.name, log.extract, conf)
-      case map: Stages.Map =>
-        mapOp(map.f, conf)
-      case mapAsync: Stages.MapAsync =>
-        //TODO
-        null
-      case mapAsync: Stages.MapAsyncUnordered =>
-        //TODO
-        null
-      case flatMap: Stages.MapConcat =>
-        flatMapOp(flatMap.f, "mapConcat", conf)
-      case stage: MaterializingStageFactory =>
-        //TODO
-        null
-      case prefixAndTail: Stages.PrefixAndTail =>
-        //TODO
-        null
-      case recover: Stages.Recover =>
-        //TODO: we will just ignore this
-        identity("recover", conf)
-      case scan: Stages.Scan =>
-        scanOp(scan.zero, scan.f, conf)
-      case split: Stages.Split =>
-        //TODO
-        null
-      case stage: Stages.StageFactory =>
-        //TODO
-        null
-      case take: Stages.Take =>
-        takeOp(take.n, conf)
-      case takeWhile: Stages.TakeWhile =>
-        filterOp(takeWhile.p, conf)
-      case time: Stages.TimerTransform =>
-        //TODO
-        null
-    }
-  }
-
-  private def translateFanIn(
-    fanIn: FanInModule,
-    edges: List[(Module, Edge, Module)],
-    parallelism: Int,
-    conf: UserConfig): Op = {
-    fanIn match {
-      case merge: MergeModule[_] =>
-        MergeOp("merge", conf)
-      case mergePrefered: MergePreferredModule[_] =>
-        //TODO, support "prefer" merge
-        MergeOp("mergePrefered", conf)
-      case zip: ZipWithModule =>
-        //TODO: support zip module
-        null
-      case concat: ConcatModule[_] =>
-        //TODO: support concat module
-        null
-      case flexiMerge: FlexiMergeModule[_, _] =>
-        //TODO: Suport flexi merge module
-        null
-    }
-  }
-
-  private def translateFanOut(
-    fanOut: FanOutModule,
-    edges: List[(Module, Edge, Module)],
-    parallelism: Int,
-    conf: UserConfig): Op = {
-    fanOut match {
-      case unzip2: UnzipWith2Module[Any, Any, Any] =>
-        val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new UnZip2Task.UnZipFunction(unzip2.f))
-        new ProcessorOp(classOf[UnZip2Task], parallelism, updatedConf, "unzip")
-      case broadcast: BroadcastModule[_] =>
-        new ProcessorOp(classOf[BroadcastTask], parallelism, conf, "broadcast")
-      case broadcast: BalanceModule[_] =>
-        new ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
-      case flexi: FlexiRouteImpl[_, _] =>
-        //TODO
-        null
-    }
-  }
-}
-
-object RemoteMaterializerImpl {
-  final val NotApplied: Any => Any = _ => NotApplied
-
-  def collectOp(collect: PartialFunction[Any, Any], conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      collect.applyOrElse(data, NotApplied) match {
-        case NotApplied => None
-        case result: Any => Option(result)
-      }
-    }, "collect", conf)
-  }
-
-  def filterOp(filter: Any => Boolean, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      if (filter(data)) Option(data) else None
-    }, "filter", conf)
-  }
-
-  def reduceOp(reduce: (Any, Any) => Any, conf: UserConfig): Op = {
-    var result: Any = null
-    val flatMap = { elem: Any =>
-      if (result == null) {
-        result = elem
-      } else {
-        result = reduce(result, elem)
-      }
-      List(result)
-    }
-    flatMapOp(flatMap, "reduce", conf)
-  }
-
-  def identity(description: String, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      List(data)
-    }, description, conf)
-  }
-
-  def mapOp(map: Any => Any, conf: UserConfig): Op = {
-    flatMapOp({ data: Any =>
-      List(map(data))
-    }, "map", conf)
-  }
-
-  def flatMapOp(flatMap: Any => Iterable[Any], conf: UserConfig): Op = {
-    flatMapOp(flatMap, "flatmap", conf)
-  }
-
-  def flatMapOp(fun: Any => TraversableOnce[Any], description: String, conf: UserConfig): Op = {
-    FlatMapOp(fun, description, conf)
-  }
-
-  def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: UserConfig): Op = {
-    var agg: Any = null
-    val flatMap = { elem: Any =>
-      agg = if (agg == null) {
-        seed(elem)
-      } else {
-        aggregate(agg, elem)
-      }
-      List(agg)
-    }
-
-    flatMapOp(flatMap, "map", conf)
-  }
-
-  def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = {
-    var aggregator: Any = zero
-    val map = { elem: Any =>
-      aggregator = fold(aggregator, elem)
-      List(aggregator)
-    }
-    flatMapOp(map, "fold", conf)
-  }
-
-  def groupedOp(count: Int, conf: UserConfig): Op = {
-    var left = count
-    val buf = {
-      val b = Vector.newBuilder[Any]
-      b.sizeHint(count)
-      b
-    }
-
-    val flatMap: Any => Iterable[Any] = { input: Any =>
-      buf += input
-      left -= 1
-      if (left == 0) {
-        val emit = buf.result()
-        buf.clear()
-        left = count
-        Some(emit)
-      } else {
-        None
-      }
-    }
-    flatMapOp(flatMap, conf: UserConfig)
-  }
-
-  def dropOp(number: Long, conf: UserConfig): Op = {
-    var left = number
-    val flatMap: Any => Iterable[Any] = { input: Any =>
-      if (left > 0) {
-        left -= 1
-        None
-      } else {
-        Some(input)
-      }
-    }
-    flatMapOp(flatMap, "drop", conf)
-  }
-
-  def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      if (drop(data)) None else Option(data)
-    }, "dropWhile", conf)
-  }
-
-  def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = {
-    val flatMap = { elem: Any =>
-      LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
-      List(elem)
-    }
-    flatMapOp(flatMap, "log", conf)
-  }
-
-  def scanOp(zero: Any, f: (Any, Any) => Any, conf: UserConfig): Op = {
-    var aggregator = zero
-    var pushedZero = false
-
-    val flatMap = { elem: Any =>
-      aggregator = f(aggregator, elem)
-
-      if (pushedZero) {
-        pushedZero = true
-        List(zero, aggregator)
-      } else {
-        List(aggregator)
-      }
-    }
-    flatMapOp(flatMap, "scan", conf)
-  }
-
-  def takeOp(count: Long, conf: UserConfig): Op = {
-    var left: Long = count
-
-    val filter: Any => Iterable[Any] = { elem: Any =>
-      left -= 1
-      if (left > 0) Some(elem)
-      else if (left == 0) Some(elem)
-      else None
-    }
-    flatMapOp(filter, "take", conf)
-  }
-
-  /**
-   * We use stains to track how module maps to Processor
-   *
-   */
-  val STAINS = "track how module is fused to processor"
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
deleted file mode 100644
index c5dfc9a..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- *
- *
- * [[IN]] -> [[BridgeModule]] -> [[OUT]]
- * /
- * /
- * out of band data input or output channel [[MAT]]
- *
- *
- * [[BridgeModule]] is used as a bridge between different materializers.
- * Different [[akka.stream.Materializer]]s can use out of band channel to
- * exchange messages.
- *
- * For example:
- *
- * Remote Materializer
- * -----------------------------
- * |                            |
- * | BridgeModule -> RemoteSink |
- * |  /                         |
- * --/----------------------------
- * Local Materializer     /  out of band channel.
- * ----------------------/----
- * | Local              /    |
- * | Source ->  BridgeModule |
- * |                         |
- * ---------------------------
- *
- *
- * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
- * module during materialization.
- *
- * However, user can still declare it explicitly. In this case, it means we have a
- * boundary Source or Sink module which accept out of band channel inputs or
- * outputs.
- *
- *
- * @tparam IN
- * @tparam OUT
- * @tparam MAT
- */
-abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] {
-  def attributes: Attributes
-  def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
-
-  protected def newInstance: BridgeModule[IN, OUT, MAT]
-  override def carbonCopy: Module = newInstance
-}
-
-/**
- *
- * Bridge module which accept out of band channel Input
- * [[org.reactivestreams.Publisher]][IN].
- *
- *
- * [[SourceBridgeModule]] -> [[OUT]]
- * /|
- * /
- * out of band data input [[org.reactivestreams.Publisher]][IN]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]]
- * @tparam OUT out put data type to next module.
- */
-class SourceBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] {
-  override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = new SourceBridgeModule[IN, OUT](attributes)
-
-  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = {
-    new SourceBridgeModule(attributes)
-  }
-}
-
-/**
- *
- * Bridge module which accept out of band channel Output
- * [[org.reactivestreams.Subscriber]][OUT].
- *
- *
- * [[IN]] -> [[BridgeModule]]
- * \
- * \
- * \|
- * out of band data output [[org.reactivestreams.Subscriber]][OUT]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from previous module
- * @tparam OUT out put data type to out of band subscriber
- */
-class SinkBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] {
-  override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = new SinkBridgeModule[IN, OUT](attributes)
-
-  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = {
-    new SinkBridgeModule[IN, OUT](attributes)
-  }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
deleted file mode 100644
index e57a6f6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Group the T value groupBy function
- *
- * @param f
- * @param attributes
- * @tparam T
- * @tparam Group
- */
-case class GroupByModule[T, Group](val groupBy: T => Group,
-    val attributes: Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit] {
-
-  override def carbonCopy: Module = newInstance
-
-  protected def newInstance: GroupByModule[T, Group] = {
-    new GroupByModule[T, Group](groupBy, attributes)
-  }
-
-  override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = {
-    new GroupByModule[T, Group](groupBy, attributes)
-  }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
similarity index 93%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
index 50c4450..4384b39 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump
+package org.apache.gearpump.akkastream
 
 import akka.stream.Attributes
 import akka.stream.Attributes.Attribute
 
 object GearAttributes {
-
   /**
    * Define how many parallel instance we want to use to run this module
-   * @param count
+   * @param count Int
    * @return
    */
   def count(count: Int): Attributes = Attributes(ParallismAttribute(count))
@@ -46,7 +45,7 @@
    * Get the effective location settings if child override the parent
    * setttings.
    *
-   * @param attrs
+   * @param attrs Attributes
    * @return
    */
   def location(attrs: Attributes): Location = {
@@ -60,7 +59,7 @@
 
   /**
    * get effective parallelism settings if child override parent.
-   * @param attrs
+   * @param attrs Attributes
    * @return
    */
   def count(attrs: Attributes): Int = {
@@ -84,7 +83,7 @@
   /**
    * How many parallel instance we want to use for this module.
    *
-   * @param parallelism
+   * @param parallelism Int
    */
   final case class ParallismAttribute(parallelism: Int) extends Attribute
 }
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
new file mode 100644
index 0000000..07c95f8
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import akka.NotUsed
+import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem}
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.Attributes.Attribute
+import akka.stream.impl.Stages.SymbolicGraphStage
+import akka.stream.impl.StreamLayout.{Atomic, Combine, CopiedModule, Ignore, MaterializedValueNode, Module, Transform}
+import akka.stream.{ActorAttributes, ActorMaterializerSettings, Attributes, ClosedShape, Fusing, Graph, InPort, OutPort, SinkShape}
+import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
+import akka.stream.impl.{ExtendedActorMaterializer, StreamSupervisor}
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
+import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
+import org.apache.gearpump.akkastream.graph._
+import org.apache.gearpump.util.{Graph => GGraph}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContextExecutor, Promise}
+import scala.concurrent.duration.FiniteDuration
+
+object GearpumpMaterializer {
+
+  final case class Edge(from: OutPort, to: InPort)
+
+  final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
+
+  implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool)
+
+  def apply(strategy: Strategy)(implicit context: ActorRefFactory): ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    apply(ActorMaterializerSettings(
+      system).withAutoFusing(false), strategy, useLocalCluster = false, "flow")(context)
+  }
+
+  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+      strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+      useLocalCluster: Boolean = true,
+      namePrefix: Option[String] = None)(implicit context: ActorRefFactory):
+    ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    val settings = materializerSettings getOrElse
+      ActorMaterializerSettings(system).withAutoFusing(false)
+    apply(settings, strategy, useLocalCluster, namePrefix.getOrElse("flow"))(context)
+  }
+
+  def apply(materializerSettings: ActorMaterializerSettings,
+      strategy: Strategy,
+      useLocalCluster: Boolean,
+      namePrefix: String)(implicit context: ActorRefFactory):
+    ExtendedActorMaterializer = {
+    val system = actorSystemOf(context)
+
+    new GearpumpMaterializer(
+      system,
+      materializerSettings,
+      context.actorOf(
+        StreamSupervisor.props(materializerSettings, false).withDispatcher(
+          materializerSettings.dispatcher), StreamSupervisor.nextName()))
+  }
+
+
+  private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
+    val system = context match {
+      case s: ExtendedActorSystem => s
+      case c: ActorContext => c.system
+      case null => throw new IllegalArgumentException("ActorRefFactory context must be defined")
+      case _ =>
+        throw new IllegalArgumentException(
+          s"""
+            |  context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
+          """.stripMargin
+        )
+    }
+    system
+  }
+
+}
+
+/**
+ *
+ * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
+ * streaming application. If some module cannot be rendered remotely in Gearpump
+ * Cluster, then it will use local Actor materializer as fallback to materialize
+ * the module locally.
+ *
+ * User can customize a [[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]]
+ * to determine which module should be rendered
+ * remotely, and which module should be rendered locally.
+ *
+ * @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]]
+ *     to find out how we cut the runnableGraph to two parts,
+ *      and materialize them separately.
+ * @param system          ActorSystem
+ * @param strategy        Strategy
+ * @param useLocalCluster whether to use built-in in-process local cluster
+ */
+class GearpumpMaterializer(override val system: ActorSystem,
+    override val settings: ActorMaterializerSettings,
+    override val supervisor: ActorRef,
+    strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+    useLocalCluster: Boolean = true, namePrefix: Option[String] = None)
+  extends ExtendedActorMaterializer {
+
+  private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
+    classOf[LocalGraph] -> new LocalGraphMaterializer(system),
+    classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system)
+  )
+
+  override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+  override def isShutdown: Boolean = system.whenTerminated.isCompleted
+
+  override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+    import ActorAttributes._
+    import Attributes._
+    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+      attr match {
+        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+        case _ => s
+      }
+    }
+  }
+
+  override def withNamePrefix(name: String): ExtendedActorMaterializer =
+    throw new UnsupportedOperationException()
+
+  override implicit def executionContext: ExecutionContextExecutor =
+    throw new UnsupportedOperationException()
+
+  override def schedulePeriodically(initialDelay: FiniteDuration,
+      interval: FiniteDuration,
+      task: Runnable): Cancellable =
+    system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+  override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+    system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
+    val info = Fusing.aggressive(runnableGraph).module.info
+    val graph = GGraph.empty[Module, Edge]
+
+    info.subModules.foreach(module => {
+      if (module.isCopied) {
+        val original = module.asInstanceOf[CopiedModule].copyOf
+        graph.addVertex(original)
+        module.shape.outlets.zip(original.shape.outlets).foreach(out => {
+          val (cout, oout) = out
+          val cin = info.downstreams(cout)
+          val downStreamModule = info.inOwners(cin)
+          if(downStreamModule.isCopied) {
+            val downStreamOriginal = downStreamModule.asInstanceOf[CopiedModule].copyOf
+            downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in => {
+              in._1 == cin match {
+                case true =>
+                  val oin = in._2
+                  graph.addEdge(original, Edge(oout, oin), downStreamOriginal)
+                case false =>
+              }
+            })
+          }
+        })
+      }
+    })
+
+    printGraph(graph)
+
+    val subGraphs = GraphPartitioner(strategy).partition(graph)
+    val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) =>
+      val materializer = subMaterializers(subGraph.getClass)
+      map ++ materializer.materialize(subGraph, map)
+    }
+    val mat = matValues.flatMap(pair => {
+      val (module, any) = pair
+      any match {
+        case notUsed: NotUsed =>
+          None
+        case others =>
+          val rt = module.shape match {
+            case sink: SinkShape[_] =>
+              Some(any)
+            case _ =>
+              None
+          }
+          rt
+      }
+    }).toList
+    val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last
+    resolveMaterialized(matModule.materializedValueComputation, matValues)
+    val rt = Some(mat).flatMap(any => {
+      any match {
+        case promise: Promise[_] =>
+          Some(promise.future)
+        case other =>
+          Some(other)
+      }
+    })
+    rt.orNull.asInstanceOf[Mat]
+  }
+
+  private def printGraph(graph: GGraph[Module, Edge]): Unit = {
+    val iterator = graph.topologicalOrderIterator
+    while (iterator.hasNext) {
+      val module = iterator.next()
+      // scalastyle:off println
+      module match {
+        case graphStageModule: GraphStageModule =>
+          graphStageModule.stage match {
+            case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
+              val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
+              println(
+                s"${module.getClass.getSimpleName}(${symbolicName})"
+              )
+            case graphStage: GraphStage[_] =>
+              val name = graphStage.getClass.getSimpleName
+              println(
+                s"${module.getClass.getSimpleName}(${name})"
+              )
+            case other =>
+              println(
+                s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
+              )
+          }
+        case _ =>
+          println(module.getClass.getSimpleName)
+      }
+      // scalastyle:on println
+    }
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+    logger
+  }
+
+  def shutdown: Unit = {
+    subMaterializers.values.foreach(_.shutdown)
+  }
+
+  private def resolveMaterialized(mat: MaterializedValueNode,
+      materializedValues: mutable.Map[Module, Any]): Any = mat match {
+    case Atomic(m) =>
+      materializedValues.getOrElse(m, ())
+    case Combine(f, d1, d2) =>
+      f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
+    case Transform(f, d) =>
+      f(resolveMaterialized(d, materializedValues))
+    case Ignore =>
+      ()
+  }
+
+
+
+}
+
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
new file mode 100644
index 0000000..8a869d2
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.{util => ju}
+
+import _root_.org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.ActorSystem
+import akka.stream._
+import org.apache.gearpump.akkastream.GearpumpMaterializer.{Edge, MaterializedValueSourceAttribute}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+
+class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module,
+    initialAttributes: Attributes, namePrefix: Option[String] = None)
+  extends MaterializerSession(topLevel, initialAttributes) {
+
+  private[this] def createFlowName(): String =
+    FlowNames(system).name.copy(namePrefix.getOrElse("flow")).next()
+
+  private val flowName = createFlowName()
+  private var nextId = 0
+
+  private def stageName(attr: Attributes): String = {
+    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+    nextId += 1
+    name
+  }
+
+  val graph = GGraph.empty[Module, Edge]
+
+  def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = {
+    graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2)
+  }
+
+  def addVertex(module: Module): Unit = {
+    graph.addVertex(module)
+  }
+
+  override def materializeModule(module: Module, parentAttributes: Attributes): Any = {
+
+    val materializedValues: ju.Map[Module, Any] = new ju.HashMap
+    val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+
+    val materializedValueSources = List.empty[MaterializedValueSource[_]]
+
+    for (submodule <- module.subModules) {
+      submodule match {
+        case atomic: AtomicModule =>
+          materializeAtomic(atomic, currentAttributes, materializedValues)
+        case copied: CopiedModule =>
+          enterScope(copied)
+          materializedValues.put(copied, materializeModule(copied, currentAttributes))
+          exitScope(copied)
+        case composite =>
+          materializedValues.put(composite, materializeComposite(composite, currentAttributes))
+        case EmptyModule =>
+      }
+    }
+
+    val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
+
+    materializedValueSources.foreach { module =>
+      val matAttribute =
+        new MaterializedValueSourceAttribute(mat.asInstanceOf[MaterializedValueNode])
+      val copied = copyAtomicModule(module.module, parentAttributes
+        and Attributes(matAttribute))
+      // TODO
+      // assignPort(module.shape.out, (copied.shape.outlets.head, copied))
+      addVertex(copied)
+      materializedValues.put(copied, Atomic(copied))
+    }
+    mat
+
+  }
+
+  override protected def materializeComposite(composite: Module,
+      effectiveAttributes: Attributes): Any = {
+    materializeModule(composite, effectiveAttributes)
+  }
+
+  protected def materializeAtomic(atomic: AtomicModule,
+      parentAttributes: Attributes,
+    matVal: ju.Map[Module, Any]): Unit = {
+
+    val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
+    val copied = copyAtomicModule(atomic, parentAttributes)
+
+    for ((in, id) <- inputs.zipWithIndex) {
+      val inPort = inPortMapping(atomic, copied)(in)
+      // assignPort(in, (inPort, copied))
+    }
+
+    for ((out, id) <- outputs.zipWithIndex) {
+      val outPort = outPortMapping(atomic, copied)(out)
+      // TODO
+      // assignPort(out, (outPort, copied))
+    }
+
+    addVertex(copied)
+    matVal.put(atomic, Atomic(copied))
+  }
+
+  private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = {
+    val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+    module.withAttributes(currentAttributes).asInstanceOf[T]
+  }
+
+  private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = {
+    from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
+  }
+
+  private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
+    from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
+  }
+
+  protected def resolveMaterialized(matNode: MaterializedValueNode,
+      materializedValues: ju.Map[Module, Any]):
+    Any =
+    matNode match {
+      case Atomic(m) => materializedValues.get(m)
+      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+        resolveMaterialized(d2, materializedValues))
+      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+      case Ignore => Ignore
+    }
+}
+
+object GearpumpMaterializerSession {
+  def apply(system: ActorSystem, topLevel: Module,
+      initialAttributes: Attributes, namePrefix: Option[String] = None):
+  GearpumpMaterializerSession = {
+    new GearpumpMaterializerSession(system, topLevel, initialAttributes, namePrefix)
+  }
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
new file mode 100644
index 0000000..52a45d9
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.{Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * Source and Sink are materialized locally.
+ * Remaining GraphStages are materialized remotely:
+ *  statefulMap, filter, fold, flatMap
+ */
+object Test extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test", akkaConf)
+    implicit val materializer = GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy)
+
+    val echo = system.actorOf(Props(new Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+
+    Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    ).filter(_.startsWith("red")).fold("Items:") {(a, b) =>
+      a + "|" + b
+    }.map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
new file mode 100644
index 0000000..826cdcf
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.{ClosedShape, ThrottleMode}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Conflate, Throttle
+ */
+object Test10 extends AkkaApp with ArgumentsParser {
+
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    implicit val system = ActorSystem("Test10", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+
+    // Conflate[A] - (2 inputs, 1 output) concatenates two streams
+    // (first consumes one, then the second one)
+    def stream(x: String) = Stream.continually(x)
+
+    val sourceA = Source(stream("A"))
+    val sourceB = Source(stream("B"))
+
+    val throttler: Flow[String, String, NotUsed] =
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)
+    val conflateFlow: Flow[String, String, NotUsed] =
+      Flow[String].conflate((x: String, y: String) => x: String)
+      ((acc: String, x: String) => s"$acc::$x")
+
+    val printFlow: Flow[(String, String), String, NotUsed] =
+      Flow[(String, String)].map {
+        x =>
+          println(s" lengths are : ${x._1.length} and ${x._2.length}  ;  ${x._1} zip ${x._2}")
+          x.toString
+      }
+
+    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+
+      val zipping = b.add(Zip[String, String]())
+
+      sourceA ~> throttler ~> zipping.in0
+      sourceB ~> conflateFlow ~> zipping.in1
+
+      zipping.out ~> printFlow ~> Sink.ignore
+
+      ClosedShape
+    })
+
+    graph.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
new file mode 100644
index 0000000..087c57d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.ClosedShape
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Broadcast and Merge
+ */
+object Test11 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    implicit val system = ActorSystem("Test11", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+    // implicit val materializer =
+    //   ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
+    implicit val ec = system.dispatcher
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create() {
+      implicit builder: GraphDSL.Builder[NotUsed] =>
+
+      import GraphDSL.Implicits._
+      val in = Source(1 to 10)
+      val output: (Any) => Unit = any => {
+        val s = s"**** $any"
+        println(s)
+      }
+      val out = Sink.foreach(output)
+
+      val broadcast = builder.add(Broadcast[Int](2))
+      val merge = builder.add(Merge[Int](2))
+
+      val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
+
+      in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out
+      broadcast ~> f4 ~> merge
+
+      ClosedShape
+    })
+
+    g.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
new file mode 100644
index 0000000..b4f4bce
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.stream.{ClosedShape, UniformFanInShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+ 
+/**
+ * Partial source, sink example
+ */
+object Test12 extends AkkaApp with ArgumentsParser{
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    import akka.actor.ActorSystem
+    import akka.stream.scaladsl._
+
+    import scala.concurrent.duration._
+
+    implicit val system = ActorSystem("Test12", akkaConfig)
+    // implicit val materializer = ActorMaterializer(
+    //   ActorMaterializerSettings(system).withAutoFusing(false)
+    //   )
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+
+    val pickMaxOfThree = GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+
+      val zip1 = b.add(ZipWith[Int, Int, Int](math.max))
+      val zip2 = b.add(ZipWith[Int, Int, Int](math.max))
+
+      zip1.out ~> zip2.in0
+
+      UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
+    }
+
+    val resultSink = Sink.head[Int]
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
+      sink =>
+        import GraphDSL.Implicits._
+
+        // Importing the partial shape will return its shape (inlets & outlets)
+        val pm3 = b.add(pickMaxOfThree)
+
+        Source.single(1) ~> pm3.in(0)
+        Source.single(2) ~> pm3.in(1)
+        Source.single(3) ~> pm3.in(2)
+
+        pm3.out ~> sink.in
+
+        ClosedShape
+    })
+
+    val max: Future[Int] = g.run()
+    max.map(x => println(s"maximum of three numbers : $x"))
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
new file mode 100644
index 0000000..2e036cb
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.time._
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.Random
+
+/**
+ * GroupBy example
+ */
+
+/*
+// Original example
+val f = Source
+  .tick(0.seconds, 1.second, "")
+  .map { _ =>
+    val now = System.currentTimeMillis()
+    val delay = random.nextInt(8)
+    MyEvent(now - delay * 1000L)
+  }
+  .statefulMapConcat { () =>
+    val generator = new CommandGenerator()
+    ev => generator.forEvent(ev)
+  }
+  .groupBy(64, command => command.w)
+  .takeWhile(!_.isInstanceOf[CloseWindow])
+  .fold(AggregateEventData((0L, 0L), 0))({
+    case (agg, OpenWindow(window)) => agg.copy(w = window)
+    // always filtered out by takeWhile
+    case (agg, CloseWindow(_)) => agg
+    case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+  })
+  .async
+  .mergeSubstreams
+  .runForeach { agg =>
+    println(agg.toString)
+  }
+ */
+object Test13 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    implicit val system = ActorSystem("Test13", akkaConfig)
+    implicit val materializer = GearpumpMaterializer()
+
+    val random = new Random()
+
+    val result = Source
+      .tick(0.seconds, 1.second, "tick data")
+      .map { _ =>
+        val now = System.currentTimeMillis()
+        val delay = random.nextInt(8)
+        MyEvent(now - delay * 1000L)
+      }
+      .statefulMapConcat { () =>
+        val generator = new CommandGenerator()
+        ev => generator.forEvent(ev)
+      }
+      .groupBy2(command => command.w)
+      .takeWhile(!_.isInstanceOf[CloseWindow])
+      .fold(AggregateEventData((0L, 0L), 0))({
+        case (agg, OpenWindow(window)) => agg.copy(w = window)
+        // always filtered out by takeWhile
+        case (agg, CloseWindow(_)) => agg
+        case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+      })
+      .runForeach(agg =>
+        println(agg.toString)
+      )
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  case class MyEvent(timestamp: Long)
+
+  type Window = (Long, Long)
+
+  object Window {
+    val WindowLength = 10.seconds.toMillis
+    val WindowStep = 1.second.toMillis
+    val WindowsPerEvent = (WindowLength / WindowStep).toInt
+
+    def windowsFor(ts: Long): Set[Window] = {
+      val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep
+      (for (i <- 0 until WindowsPerEvent) yield
+        (firstWindowStart + i * WindowStep,
+          firstWindowStart + i * WindowStep + WindowLength)
+        ).toSet
+    }
+  }
+
+  sealed trait WindowCommand {
+    def w: Window
+  }
+
+  case class OpenWindow(w: Window) extends WindowCommand
+
+  case class CloseWindow(w: Window) extends WindowCommand
+
+  case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand
+
+  class CommandGenerator {
+    private val MaxDelay = 5.seconds.toMillis
+    private var watermark = 0L
+    private val openWindows = mutable.Set[Window]()
+
+    def forEvent(ev: MyEvent): List[WindowCommand] = {
+      watermark = math.max(watermark, ev.timestamp - MaxDelay)
+      if (ev.timestamp < watermark) {
+        println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}")
+        Nil
+      } else {
+        val eventWindows = Window.windowsFor(ev.timestamp)
+
+        val closeCommands = openWindows.flatMap { ow =>
+          if (!eventWindows.contains(ow) && ow._2 < watermark) {
+            openWindows.remove(ow)
+            Some(CloseWindow(ow))
+          } else None
+        }
+
+        val openCommands = eventWindows.flatMap { w =>
+          if (!openWindows.contains(w)) {
+            openWindows.add(w)
+            Some(OpenWindow(w))
+          } else None
+        }
+
+        val addCommands = eventWindows.map(w => AddToWindow(ev, w))
+
+        openCommands.toList ++ closeCommands.toList ++ addCommands.toList
+      }
+    }
+  }
+
+  case class AggregateEventData(w: Window, eventCount: Int) {
+    override def toString: String =
+      s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events."
+  }
+
+  def tsToString(ts: Long): String = OffsetDateTime
+    .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
+    .toLocalTime
+    .toString
+  // scalastyle:on println
+
+}
+
+
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
new file mode 100644
index 0000000..c436130
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.File
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+object Test14 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test14", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    def lineSink(filename: String): Sink[String, Future[IOResult]] = {
+      Flow[String]
+        .alsoTo(Sink.foreach(s => println(s"$filename: $s")))
+        .map(s => ByteString(s + "\n"))
+        .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right)
+    }
+
+    val source: Source[Int, NotUsed] = Source(1 to 100)
+    val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next)
+    val sink1 = lineSink("factorial1.txt")
+    val sink2 = lineSink("factorial2.txt")
+    val slowSink2 = Flow[String].via(
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+    ).toMat(sink2)(Keep.right)
+    val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via(
+      Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+    ).toMat(sink2)(Keep.right)
+
+    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+      val bcast = b.add(Broadcast[String](2))
+      factorials.map(_.toString) ~> bcast.in
+      bcast.out(0) ~> sink1
+      bcast.out(1) ~> bufferedSink2
+      ClosedShape
+    })
+
+    g.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
new file mode 100644
index 0000000..f4e4dbd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+object Test15 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test15", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    import akka.stream.scaladsl.GraphDSL.Implicits._
+    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
+      val A = builder.add(Source.single(0)).out
+      val B = builder.add(Broadcast[Int](2))
+      val C = builder.add(Merge[Int](2).named("C"))
+      val D = builder.add(Flow[Int].map(_ + 1).named("D"))
+      val E = builder.add(Balance[Int](2).named("E"))
+      val F = builder.add(Merge[Int](2).named("F"))
+      val G = builder.add(Sink.foreach(println).named("G")).in
+
+      C <~ F
+      A ~> B ~> C ~> F
+      B ~> D ~> E ~> F
+      E ~> G
+
+      ClosedShape
+    }).run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
+
+
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
new file mode 100644
index 0000000..9691496
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, LoggerSink}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * All remote
+ */
+object Test16 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test16", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    val sink = GearSink.to(new LoggerSink[String])
+    val sourceData = new CollectionDataSource(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+    val source = GearSource.from[String](sourceData)
+    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
new file mode 100644
index 0000000..a6049cd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl._
+import akka.stream.{ActorMaterializer, ClosedShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ *
+ * This tests how different Materializers can be used together in an explicit way.
+ *
+ */
+object Test2 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test2", akkaConf)
+    val gearpumpMaterializer = GearpumpMaterializer()
+
+    val echo = system.actorOf(Props(new Echo()))
+    val source = GearSource.bridge[String, String]
+    val sink = GearSink.bridge[String, String]
+
+    val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
+    val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer)
+
+    val actorMaterializer = ActorMaterializer()
+
+    val externalSource = Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    )
+    val externalSink = Sink.actorRef(echo, "COMPLETE")
+
+    RunnableGraph.fromGraph(
+      GraphDSL.create() { implicit b =>
+        import GraphDSL.Implicits._
+        externalSource ~> Sink.fromSubscriber(entry)
+        Source.fromPublisher(exit) ~> externalSink
+        ClosedShape
+      }
+    ).run()(actorMaterializer)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
new file mode 100644
index 0000000..24faeb3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from remote and write to local
+ */
+object Test3 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test3", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    val echo = system.actorOf(Props(new Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+    val sourceData = new CollectionDataSource(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+    val source = GearSource.from[String](sourceData)
+    source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
new file mode 100644
index 0000000..6a44a35
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.LoggerSink
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from local and write to remote
+ */
+object Test4 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test4", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    Source(
+      List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+    ).filter(_.startsWith("red")).
+      map("I want to order item: " + _).
+      runWith(GearSink.to(new LoggerSink[String]))
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
new file mode 100644
index 0000000..ad87a97
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ClosedShape
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * test fanout
+ */
+object Test5 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test5", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+
+    val echo = system.actorOf(Props(new Echo()))
+    val source = Source(List(("male", "24"), ("female", "23")))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+
+    RunnableGraph.fromGraph(
+      GraphDSL.create() { implicit b =>
+        import GraphDSL.Implicits._
+        val unzip = b.add(Unzip[String, String]())
+        val sink1 = Sink.actorRef(echo, "COMPLETE")
+        val sink2 = Sink.actorRef(echo, "COMPLETE")
+        source ~> unzip.in
+        unzip.out0 ~> sink1
+        unzip.out1 ~> sink1
+        ClosedShape
+      }
+    ).run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class Echo extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
new file mode 100644
index 0000000..a525471
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.Sink
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ *  WordCount example
+ * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet)
+ */
+
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+
+object Test6 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test6", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    val echo = system.actorOf(Props(Echo()))
+    val sink = Sink.actorRef(echo, "COMPLETE")
+    val sourceData = new CollectionDataSource(
+      List(
+        "this is a good start",
+        "this is a good time",
+        "time to start",
+        "congratulations",
+        "green plant",
+        "blue sky")
+    )
+    val source = GearSource.from[String](sourceData)
+    source.mapConcat({line =>
+      line.split(" ").toList
+    }).groupBy2(x => x)
+      .map(word => (word, 1))
+      .reduce({(a, b) =>
+        (a._1, a._2 + b._2)
+      })
+      .log("word-count")
+      .runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  case class Echo() extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
new file mode 100644
index 0000000..8c837af
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * This is a simplified API you can use to combine sources and sinks
+ * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A]
+ * without the need for using the Graph DSL
+ */
+
+object Test7 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val system = ActorSystem("Test7", akkaConf)
+    implicit val materializer = GearpumpMaterializer()
+    implicit val ec = system.dispatcher
+ 
+    val sourceA = Source(List(1))
+    val sourceB = Source(List(2))
+    val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))
+
+    val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))
+    val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x"))
+    val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_))
+    mergedSource.runWith(sink)
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
new file mode 100644
index 0000000..ad2ac61
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example to find sum of elements
+ */
+object Test8 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test8", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    implicit val ec = system.dispatcher
+
+    // Source gives 1 to 100 elements
+    val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100))
+    val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
+
+    val result: Future[Int] = source.runWith(sink)
+    result.map(sum => {
+      println(s"Sum of stream elements => $sum")
+    })
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
new file mode 100644
index 0000000..66414e0
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+ 
+/**
+ * Stream example showing Broadcast
+ */
+object Test9 extends AkkaApp with ArgumentsParser {
+  // scalastyle:off println
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    implicit val system = ActorSystem("Test9", akkaConf)
+    implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+      case true =>
+        GearpumpMaterializer()
+      case false =>
+        ActorMaterializer(
+          ActorMaterializerSettings(system).withAutoFusing(false)
+        )
+    }
+    implicit val ec = system.dispatcher
+
+    val sinkActor = system.actorOf(Props(new SinkActor()))
+    val source = Source((1 to 5))
+    val sink = Sink.actorRef(sinkActor, "COMPLETE")
+    val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map {
+      x => println(s"processing broadcasted element : $x in flowA"); x
+    }
+    val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map {
+      x => println(s"processing broadcasted element : $x in flowB"); x
+    }
+
+    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+      import GraphDSL.Implicits._
+      val broadcast = b.add(Broadcast[Int](2))
+      val merge = b.add(Merge[Int](2))
+      source ~> broadcast
+      broadcast ~> flowA ~> merge
+      broadcast ~> flowB ~> merge
+      merge ~> sink
+      ClosedShape
+    })
+
+    graph.run()
+
+    Await.result(system.whenTerminated, 60.minutes)
+  }
+
+  class SinkActor extends Actor {
+    def receive: Receive = {
+      case any: AnyRef =>
+        println("Confirm received: " + any)
+    }
+  }
+  // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
similarity index 60%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
index 56b89bc..2a1e7ff 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -16,30 +16,34 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.example
+package org.apache.gearpump.akkastream.example
 
 import java.io.{File, FileInputStream}
 import java.util.zip.GZIPInputStream
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
 
+import akka.NotUsed
 import akka.actor.ActorSystem
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer}
 import akka.stream.scaladsl._
+import akka.stream.{ClosedShape, IOResult}
 import akka.util.ByteString
-import org.json4s.JsonAST.JString
-
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer}
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import org.apache.gearpump.util.AkkaApp
+import org.json4s.JsonAST.JString
+import org.json4s.jackson.JsonMethods
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
 
 /**
  * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
  * which showcases running Akka Streams DSL across JVMs on Gearpump
  *
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
- * -input wikidata-${DATE}-all.json.gz -languages en,de
+ * Usage: output/target/pack/bin/gear app
+ *  -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
+ *  -input wikidata-${DATE}-all.json.gz -languages en,de
  *
  * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/)
  *
@@ -58,54 +62,58 @@
     val input = new File(parsed.getString("input"))
     val langs = parsed.getString("languages").split(",")
 
-    implicit val system = ActorSystem("wikidata-poc", akkaConf)
-    implicit val materializer = new GearpumpMaterializer(system, GraphCutter.TagAttributeStrategy, useLocalCluster = false)
+    implicit val system = ActorSystem("WikipediaApp", akkaConf)
+    implicit val materializer =
+      GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy)
     import system.dispatcher
 
     val elements = source(input).via(parseJson(langs))
 
-    val g = FlowGraph.closed(count) { implicit b =>
-      sinkCount => {
-
-        val broadcast = b.add(Broadcast[WikidataElement](2))
-        elements ~> broadcast ~> logEveryNSink(1000)
-        broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+    val g = RunnableGraph.fromGraph(
+      GraphDSL.create(count) { implicit b =>
+        sinkCount => {
+          import GraphDSL.Implicits._
+          val broadcast = b.add(Broadcast[WikidataElement](2))
+          elements ~> broadcast ~> logEveryNSink(1000)
+          broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+          ClosedShape
+        }
       }
-    }
+    )
 
-    g.run().onComplete { x =>
-      x match {
-        case Success((t, f)) => printResults(t, f)
-        case Failure(tr) => println("Something went wrong")
-      }
-      system.terminate()
+    g.run().onComplete {
+      case Success((t, f)) => printResults(t, f)
+      // scalastyle:off println
+      case Failure(tr) => println("Something went wrong")
+      // scalastyle:on println
     }
-    Await.result(system.whenTerminated, Duration.Inf)
+    Await.result(system.whenTerminated, 60.minutes)
   }
 
-  def source(file: File): Source[String, Future[Long]] = {
+  def source(file: File): Source[String, Future[IOResult]] = {
     val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
-    InputStreamSource(() => compressed)
+    StreamConverters.fromInputStream(() => compressed)
       .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
       .map(x => x.decodeString("utf-8"))
   }
 
-  def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): Flow[String, WikidataElement, Unit] =
-    Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect {
+  def parseJson(langs: Seq[String])(implicit ec: ExecutionContext):
+  Flow[String, WikidataElement, NotUsed] =
+    Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({
       case Some(v) => v
-    }
+    })
 
   def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
-    import org.json4s.jackson.JsonMethods
     Try(JsonMethods.parse(line)).toOption.flatMap { json =>
       json \ "id" match {
         case JString(itemId) =>
-          val sites = for {
+
+          val sites: Seq[(String, String)] = for {
             lang <- langs
             JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
           } yield lang -> title
 
-          if (sites.isEmpty) None
+          if(sites.isEmpty) None
           else Some(WikidataElement(id = itemId, sites = sites.toMap))
 
         case _ => None
@@ -113,17 +121,21 @@
     }
   }
 
-  def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) =>
-    if (x % n == 0)
+  def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) =>
+    if (x % n == 0) {
+      // scalastyle:off println
       println(s"Processing element $x: $y")
+      // scalastyle:on println
+    }
     x + 1
   }
 
-  def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, Unit] = Flow[WikidataElement]
+  def checkSameTitles(langs: Set[String]):
+    Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement]
     .filter(_.sites.keySet == langs)
     .map { x =>
       val titles = x.sites.values
-      titles.forall(_ == titles.head)
+      titles.forall( _ == titles.head)
     }.withAttributes(GearAttributes.remote)
 
   def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
@@ -131,13 +143,15 @@
     case ((t, f), false) => (t, f + 1)
   }
 
-  def printResults(t: Int, f: Int) = {
-    val message =
-      s"""
-         | Number of items with the same title: $t
-         | Number of items with the different title: $f
-         | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
-                  """.stripMargin
+  def printResults(t: Int, f: Int): Unit = {
+    val message = s"""
+      | Number of items with the same title: $t
+      | Number of items with the different title: $f
+      | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+      """.stripMargin
+    // scalastyle:off println
     println(message)
+    // scalastyle:on println
   }
+
 }
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
similarity index 65%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
index 19083f6..f7919c0 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.graph
+package org.apache.gearpump.akkastream.graph
 
-import akka.stream.ModuleGraph
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.GearAttributes.{Local, Location, Remote}
-import akka.stream.gearpump.graph.GraphCutter.Strategy
-import akka.stream.gearpump.module.{BridgeModule, DummyModule, GearpumpTaskModule, GroupByModule, SinkBridgeModule, SourceBridgeModule}
-import akka.stream.impl.Stages.DirectProcessor
-import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module}
+import akka.stream.{Shape, SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.module._
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.fusing.GraphStageModule
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource}
 import akka.stream.impl.{SinkModule, SourceModule}
-
 import org.apache.gearpump.util.Graph
 
 /**
  *
- * GraphCutter is used to decide which part is rendered locally
+ * GraphPartitioner is used to decide which part will be rendered locally
  * and which part should be rendered remotely.
  *
  * We will cut the graph based on the [[Strategy]] provided.
@@ -54,23 +54,22 @@
  *       \|              /
  *         AtomicModule3
  *
- *
- * @see [[ModuleGraph]] for more information of how Graph is organized.
+ * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized.
  *
  */
-class GraphCutter(strategy: Strategy) {
-  def cut(moduleGraph: ModuleGraph[_]): List[SubGraph] = {
-    val graph = removeDummyModule(moduleGraph.graph)
+class GraphPartitioner(strategy: Strategy) {
+  def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = {
+    val graph = removeDummyModule(moduleGraph)
     val tags = tag(graph, strategy)
-    doCut(graph, tags, moduleGraph.mat)
+    doPartition(graph, tags)
   }
 
-  private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location],
-      mat: MaterializedValueNode): List[SubGraph] = {
+  private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]):
+  List[SubGraph] = {
     val local = Graph.empty[Module, Edge]
     val remote = Graph.empty[Module, Edge]
 
-    graph.vertices.foreach { module =>
+    graph.vertices.foreach{ module =>
       if (tags(module) == Local) {
         local.addVertex(module)
       } else {
@@ -78,7 +77,7 @@
       }
     }
 
-    graph.edges.foreach { nodeEdgeNode =>
+    graph.edges.foreach{ nodeEdgeNode =>
       val (node1, edge, node2) = nodeEdgeNode
       (tags(node1), tags(node2)) match {
         case (Local, Local) =>
@@ -90,7 +89,7 @@
             case bridge: BridgeModule[_, _, _] =>
               local.addEdge(node1, edge, node2)
             case _ =>
-              // Creates a bridge module in between
+              // create a bridge module in between
               val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
               val remoteEdge = Edge(bridge.outPort, edge.to)
               remote.addEdge(bridge, remoteEdge, node2)
@@ -102,7 +101,7 @@
             case bridge: BridgeModule[_, _, _] =>
               local.addEdge(node1, edge, node2)
             case _ =>
-              // Creates a bridge module in between
+              // create a bridge module in between
               val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
               val remoteEdge = Edge(edge.from, bridge.inPort)
               remote.addEdge(node1, remoteEdge, bridge)
@@ -116,14 +115,14 @@
   }
 
   private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
-    graph.vertices.map { vertex =>
+    graph.vertices.map{vertex =>
       vertex -> strategy.apply(vertex)
     }.toMap
   }
 
   private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
     val graph = inputGraph.copy
-    val dummies = graph.vertices.filter { module =>
+    val dummies = graph.vertices.filter {module =>
       module match {
         case dummy: DummyModule =>
           true
@@ -136,7 +135,7 @@
   }
 }
 
-object GraphCutter {
+object GraphPartitioner {
 
   type Strategy = PartialFunction[Module, Location]
 
@@ -146,22 +145,33 @@
     case task: GearpumpTaskModule =>
       Remote
     case groupBy: GroupByModule[_, _] =>
-      // TODO: groupBy is not supported by local materializer yet
+      // TODO: groupBy is not supported by local materializer
       Remote
     case source: SourceModule[_, _] =>
       Local
     case sink: SinkModule[_, _] =>
       Local
-    case matValueSource: MaterializedValueSource[_] =>
-      Local
-    case direct: DirectProcessor =>
-      Local
-    case time: TimerTransform =>
-      // Renders to local as it requires a timer.
-      Local
+    case remaining: Module =>
+      remaining.shape match {
+        case sourceShape: SourceShape[_] =>
+          Local
+        case sinkShape: SinkShape[_] =>
+          Local
+        case otherShapes: Shape =>
+          Remote
+      }
   }
 
   val AllRemoteStrategy: Strategy = BaseStrategy orElse {
+    case graphStageModule: GraphStageModule =>
+      graphStageModule.stage match {
+        case matValueSource: MaterializedValueSource[_] =>
+          Local
+        case singleSource: SingleSource[_] =>
+          Local
+        case _ =>
+          Remote
+      }
     case _ =>
       Remote
   }
@@ -177,7 +187,19 @@
   }
 
   val AllLocalStrategy: Strategy = BaseStrategy orElse {
+    case graphStageModule: GraphStageModule =>
+      // TODO kasravi review
+      graphStageModule.stage match {
+        case matValueSource: MaterializedValueSource[_] =>
+          Local
+        case _ =>
+          Local
+      }
     case _ =>
       Local
   }
-}
\ No newline at end of file
+
+  def apply(strategy: Strategy): GraphPartitioner = {
+    new GraphPartitioner(strategy)
+  }
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
new file mode 100644
index 0000000..fe86951
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import akka.stream.impl.Stages.DefaultAttributes
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.{PublisherSource, SubscriberSink}
+import akka.stream.{SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.util.Graph
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ *  contain module that can be materialized in local JVM.
+ *
+ * @param graph Graph[Module, Edge]
+ */
+class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object LocalGraph {
+
+  /**
+   * materialize LocalGraph in local JVM
+   * @param system ActorSystem
+   */
+  class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
+
+    // create a local materializer
+    val materializer = LocalMaterializerImpl()(system)
+
+    /**
+     *
+     * @param matValues Materialized Values for each module before materialization
+     * @return Materialized Values for each Module after the materialization.
+     */
+    override def materialize(graph: SubGraph,
+        matValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
+      val newGraph: Graph[Module, Edge] = graph.graph.mapVertex {
+        case source: SourceBridgeModule[in, out] =>
+          val subscriber = matValues(source).asInstanceOf[Subscriber[in]]
+          val shape: SinkShape[in] = SinkShape(source.inPort)
+          new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
+        case sink: SinkBridgeModule[in, out] =>
+          val publisher = matValues(sink).asInstanceOf[Publisher[out]]
+          val shape: SourceShape[out] = SourceShape(sink.outPort)
+          new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
+        case other =>
+          other
+      }
+      materializer.materialize(newGraph, matValues)
+    }
+
+    override def shutdown: Unit = {
+      materializer.shutdown()
+    }
+  }
+}
+
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
similarity index 64%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 3cea78a..99ebe17 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.graph
+package org.apache.gearpump.akkastream.graph
 
 import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.materializer.RemoteMaterializerImpl
-import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule}
-import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient
-import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
+import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
 import akka.stream.impl.StreamLayout.Module
-
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.streaming.ProcessorId
@@ -34,9 +33,9 @@
 /**
  *
  * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- * contain modules that can be materialized in remote Gearpump cluster.
+ *  contain modules that can be materialized in remote Gearpump cluster.
  *
- * @param graph
+ * @param graph Graph
  */
 class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph
 
@@ -44,10 +43,11 @@
 
   /**
    * * materialize LocalGraph in remote gearpump cluster
-   * @param useInProcessCluster
-   * @param system
+   * @param useInProcessCluster Boolean
+   * @param system ActorSystem
    */
-  class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer {
+  class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
+    extends SubGraphMaterializer {
     private val local = if (useInProcessCluster) {
       val cluster = EmbeddedCluster()
       cluster.start()
@@ -57,13 +57,15 @@
     }
 
     private val context: ClientContext = local match {
-      case Some(local) => local.newClientContext
+      case Some(l) => l.newClientContext
       case None => ClientContext(system)
     }
 
-    override def materialize(subGraph: SubGraph, inputMatValues: Map[Module, Any]): Map[Module, Any] = {
+    override def materialize(subGraph: SubGraph,
+        inputMatValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
       val graph = subGraph.graph
-
+      
       if (graph.isEmpty) {
         inputMatValues
       } else {
@@ -71,22 +73,27 @@
       }
     }
 
-    private def doMaterialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = {
+    private def doMaterialize(graph: Graph[Module, Edge],
+        inputMatValues: scala.collection.mutable.Map[Module, Any]):
+        scala.collection.mutable.Map[Module, Any] = {
       val materializer = new RemoteMaterializerImpl(graph, system)
       val (app, matValues) = materializer.materialize
 
-      val appId = context.submit(app)
-      println("sleep 5 second until the applicaiton is ready on cluster")
+      val appId = context.submit(app).appId
+      // scalastyle:off println
+      println("sleep 5 second until the application is ready on cluster")
+      // scalastyle:on println
       Thread.sleep(5000)
 
       def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = {
         matValues.toList.flatMap { kv =>
           val (module, processorId) = kv
           module match {
-            case source: SourceBridgeModule[AnyRef, AnyRef] =>
-              val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher, context, appId, processorId)
+            case source: SourceBridgeModule[_, _] =>
+              val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher,
+                context, appId, processorId)
               Some((module, bridge))
-            case sink: SinkBridgeModule[AnyRef, AnyRef] =>
+            case sink: SinkBridgeModule[_, _] =>
               val bridge = new SinkBridgeTaskClient(system, context, appId, processorId)
               Some((module, bridge))
             case other =>
@@ -98,9 +105,9 @@
       inputMatValues ++ resolve(matValues)
     }
 
-    override def shutdown(): Unit = {
+    override def shutdown: Unit = {
       context.close()
-      local.map(_.stop())
+      local.foreach(_.stop())
     }
   }
 }
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
similarity index 77%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
index 564b6c7..a74143e 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.graph
+package org.apache.gearpump.akkastream.graph
 
-import akka.stream.ModuleGraph.Edge
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
 import akka.stream.impl.StreamLayout.Module
-
 import org.apache.gearpump.util.Graph
 
 /**
- * [[SubGraph]] is a partial part of [[akka.stream.ModuleGraph]]
+ * [[SubGraph]] is a partial DAG
  *
- * The idea is that by dividing [[akka.stream.ModuleGraph]] to several
+ * The idea is that by dividing [[Graph]] to several
  * [[SubGraph]], we can materialize each [[SubGraph]] with different
  * materializer.
  */
@@ -40,6 +40,7 @@
   def graph: Graph[Module, Edge]
 }
 
+
 /**
  * Materializer for Sub-Graph type
  */
@@ -50,7 +51,9 @@
    * @return Materialized Values for each Module after the materialization.
    */
 
-  def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any]
+  def materialize(graph: SubGraph,
+      matValues: scala.collection.mutable.Map[Module, Any]):
+    scala.collection.mutable.Map[Module, Any]
 
-  def shutdown(): Unit
-}
\ No newline at end of file
+  def shutdown: Unit
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
new file mode 100644
index 0000000..477f4d3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.{util => ju}
+
+import org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill}
+import akka.dispatch.Dispatchers
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
+import akka.stream.impl.fusing.{ActorGraphInterpreter, Fold, GraphInterpreterShell, GraphModule, GraphStageModule}
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+import akka.stream.scaladsl.ModuleExtractor
+import akka.stream.{ClosedShape, Graph => AkkaGraph, _}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.ReduceModule
+import org.apache.gearpump.akkastream.util.MaterializedValueOps
+import org.reactivestreams.{Publisher, Subscriber}
+
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
+ *
+ * @param system System
+ * @param settings ActorMaterializerSettings
+ * @param dispatchers Dispatchers
+ * @param supervisor ActorRef
+ * @param haveShutDown AtomicBoolean
+ * @param flowNames SeqActorName
+ */
+case class LocalMaterializerImpl (
+    override val system: ActorSystem,
+    override val settings: ActorMaterializerSettings,
+    dispatchers: Dispatchers,
+    override val supervisor: ActorRef,
+    haveShutDown: AtomicBoolean,
+    flowNames: SeqActorName)
+  extends ExtendedActorMaterializer {
+
+  override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+  override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration,
+      task: Runnable): Cancellable =
+    system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+  override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+    system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+  override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+    import ActorAttributes._
+    import Attributes._
+    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+      attr match {
+        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+        case l: LogLevels => s
+        case Name(_) => s
+        case other => s
+      }
+    }
+  }
+
+  override def shutdown(): Unit =
+    if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
+
+  override def isShutdown: Boolean = haveShutDown.get()
+
+  override lazy val executionContext: ExecutionContextExecutor =
+    dispatchers.lookup(settings.dispatcher match {
+      case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
+      case other => other
+  })
+
+
+  case class LocalMaterializerSession(module: Module, iAttributes: Attributes,
+      subflowFuser: GraphInterpreterShell => ActorRef = null)
+    extends MaterializerSession(module, iAttributes) {
+
+    override def materializeAtomic(atomic: AtomicModule,
+        effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
+
+      def newMaterializationContext() =
+        new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes,
+          stageName(effectiveAttributes))
+      atomic match {
+        case sink: SinkModule[_, _] =>
+          val (sub, mat) = sink.create(newMaterializationContext())
+          assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]])
+          matVal.put(atomic, mat)
+        case source: SourceModule[_, _] =>
+          val (pub, mat) = source.create(newMaterializationContext())
+          assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
+          matVal.put(atomic, mat)
+        case stage: ProcessorModule[_, _, _] =>
+          val (processor, mat) = stage.createProcessor()
+          assignPort(stage.inPort, processor)
+          assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
+          matVal.put(atomic, mat)
+        // FIXME
+        //        case tls: TlsModule =>
+        // TODO solve this so TlsModule doesn't need special treatment here
+        //          val es = effectiveSettings(effectiveAttributes)
+        //          val props =
+        //            TLSActor.props(es, tls.sslContext, tls.sslConfig,
+        //              tls.firstSession, tls.role, tls.closing, tls.hostInfo)
+        //          val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
+        //          def factory(id: Int) = new ActorPublisher[Any](impl) {
+        //            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
+        //          }
+        //          val publishers = Vector.tabulate(2)(factory)
+        //          impl ! FanOut.ExposedPublishers(publishers)
+        //
+        //          assignPort(tls.plainOut, publishers(TLSActor.UserOut))
+        //          assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
+        //
+        //          assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
+        //          assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn))
+        //
+        //          matVal.put(atomic, NotUsed)
+        case graph: GraphModule =>
+          matGraph(graph, effectiveAttributes, matVal)
+        case stage: GraphStageModule =>
+          val graph =
+            GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage),
+              stage.shape, stage.attributes, Array(stage))
+          matGraph(graph, effectiveAttributes, matVal)
+      }
+    }
+
+    private def matGraph(graph: GraphModule, effectiveAttributes: Attributes,
+        matVal: ju.Map[Module, Any]): Unit = {
+      val calculatedSettings = effectiveSettings(effectiveAttributes)
+      val (handlers, logics) =
+        graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
+
+      val shell = new GraphInterpreterShell(graph.assembly, handlers,
+        logics, graph.shape, calculatedSettings, LocalMaterializerImpl.this)
+
+      val impl =
+        if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) {
+          subflowFuser(shell)
+        } else {
+          val props = ActorGraphInterpreter.props(shell)
+          actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
+        }
+
+      for ((inlet, i) <- graph.shape.inlets.iterator.zipWithIndex) {
+        val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i)
+        assignPort(inlet, subscriber)
+      }
+      for ((outlet, i) <- graph.shape.outlets.iterator.zipWithIndex) {
+        val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i)
+        impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher)
+        assignPort(outlet, publisher)
+      }
+    }
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = {
+
+    LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+      null, null).materialize().asInstanceOf[Mat]
+
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+
+    LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+      null, null).materialize().asInstanceOf[Mat]
+
+  }
+
+  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+      subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
+    materialize(runnableGraph)
+  }
+
+  override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+    logger
+  }
+
+  def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
+    var moduleInProgress: Module = EmptyModule
+    graph.vertices.foreach(module => {
+      moduleInProgress = moduleInProgress.compose(module)
+    })
+    graph.edges.foreach(value => {
+      val (node1, edge, node2) = value
+      moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
+    })
+
+    moduleInProgress
+  }
+
+  def materialize(graph: GGraph[Module, Edge],
+      inputMatValues: scala.collection.mutable.Map[Module, Any]):
+      scala.collection.mutable.Map[Module, Any] = {
+    val topLevelModule = buildToplevelModule(graph)
+    val session = LocalMaterializerSession(topLevelModule, null, null)
+    import scala.collection.JavaConverters._
+    val matV = inputMatValues.asJava
+    val materializedGraph = graph.mapVertex { module =>
+      session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV)
+      matV.get(module)
+    }
+    materializedGraph.edges.foreach { nodeEdgeNode =>
+      val (node1, edge, node2) = nodeEdgeNode
+      val from = edge.from
+      val to = edge.to
+      node1 match {
+        case module1: Module =>
+          node2 match {
+            case module2: Module =>
+              val publisher = module1.downstreams(from).asInstanceOf[Publisher[Any]]
+              val subscriber = module2.upstreams(to).asInstanceOf[Subscriber[Any]]
+              publisher.subscribe(subscriber)
+            case _ =>
+          }
+        case _ =>
+      }
+    }
+    val matValSources = graph.vertices.flatMap(module => {
+      val rt: Option[MaterializedValueSource[_]] = module match {
+        case graphStage: GraphStageModule =>
+          graphStage.stage match {
+            case materializedValueSource: MaterializedValueSource[_] =>
+              Some(materializedValueSource)
+            case _ =>
+              None
+          }
+        case _ =>
+          None
+      }
+      rt
+    })
+    publishToMaterializedValueSource(matValSources, inputMatValues)
+    inputMatValues
+  }
+
+  private def publishToMaterializedValueSource(modules: List[MaterializedValueSource[_]],
+      matValues: scala.collection.mutable.Map[Module, Any]): Unit = {
+    modules.foreach { source =>
+      Option(source.computation).map { attr =>
+        val valueToPublish = MaterializedValueOps(attr).resolve(matValues)
+        source.setValue(valueToPublish)
+      }
+    }
+  }
+
+  private[this] def createFlowName(): String = flowNames.next()
+
+  val flowName = createFlowName()
+  var nextId = 0
+
+  def stageName(attr: Attributes): String = {
+    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+    nextId += 1
+    name
+  }
+
+  override def withNamePrefix(name: String): LocalMaterializerImpl =
+    this.copy(flowNames = flowNames.copy(name))
+
+}
+
+object LocalMaterializerImpl {
+  case class MaterializedModule(module: Module, matValue: Any,
+      inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]],
+      outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
+
+  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+      namePrefix: Option[String] = None)(implicit system: ActorSystem):
+  LocalMaterializerImpl = {
+
+    val settings = materializerSettings getOrElse ActorMaterializerSettings(system)
+    apply(settings, namePrefix.getOrElse("flow"))(system)
+  }
+
+  def apply(materializerSettings: ActorMaterializerSettings,
+      namePrefix: String)(implicit system: ActorSystem): LocalMaterializerImpl = {
+    val haveShutDown = new AtomicBoolean(false)
+
+    new LocalMaterializerImpl(
+      system,
+      materializerSettings,
+      system.dispatchers,
+      system.actorOf(StreamSupervisor.props(materializerSettings,
+        haveShutDown).withDispatcher(materializerSettings.dispatcher)),
+      haveShutDown,
+      FlowNames(system).name.copy(namePrefix))
+  }
+
+  def toFoldModule(reduce: ReduceModule[Any]): Fold[Any, Any] = {
+    val f = reduce.f
+    val aggregator = {(zero: Any, input: Any) =>
+      if (zero == null) {
+        input
+      } else {
+        f(zero, input)
+      }
+    }
+    Fold(null, aggregator)
+  }
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
new file mode 100644
index 0000000..e065c90
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import akka.actor.ActorSystem
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.Timers.{Completion, DelayInitial, Idle, IdleInject, IdleTimeoutBidi, Initial}
+import akka.stream.impl.fusing.{Batch, Collect, Delay, Drop, DropWhile, DropWithin, Filter, FlattenMerge, Fold, GraphStageModule, GroupBy, GroupedWithin, Intersperse, LimitWeighted, Log, MapAsync, MapAsyncUnordered, PrefixAndTail, Recover, Reduce, Scan, Split, StatefulMapConcat, SubSink, SubSource, Take, TakeWhile, TakeWithin, Map => FMap}
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource, TickSource}
+import akka.stream.impl.io.IncomingConnectionStage
+import akka.stream.impl.{HeadOptionStage, Stages, Throttle, Unfold, UnfoldAsync}
+import akka.stream.scaladsl.{Balance, Broadcast, Concat, Interleave, Merge, MergePreferred, MergeSorted, ModuleExtractor, Unzip, Zip, ZipWith2}
+import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
+import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, BroadcastTask, ConcatTask, DelayInitialTask, DropWithinTask, FlattenMergeTask, FoldTask, GraphTask, GroupedWithinTask, InterleaveTask, MapAsyncTask, MergeTask, SingleSourceTask, SinkBridgeTask, SourceBridgeTask, StatefulMapConcatTask, TakeWithinTask, ThrottleTask, TickSourceTask, Zip2Task}
+import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, INTERVAL, TICK}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper
+import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Promise
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump
+ * Streaming Application.
+ *
+ * @param graph Graph
+ * @param system ActorSystem
+ */
+class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
+
+  import RemoteMaterializerImpl._
+
+  type ID = String
+  private implicit val actorSystem = system
+
+  private def uuid: String = {
+    java.util.UUID.randomUUID.toString
+  }
+
+  def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
+    val (opGraph, ids) = toOpGraph
+    val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph)
+    val processorIds = resolveIds(app, ids)
+
+    val updatedApp = updateJunctionConfig(processorIds, app)
+    (removeIds(updatedApp), processorIds)
+  }
+
+  private def updateJunctionConfig(processorIds: Map[Module, ProcessorId],
+      app: StreamApplication): StreamApplication = {
+    val config = junctionConfig(processorIds)
+
+    val dag = app.dag.mapVertex { vertex =>
+      val processorId = vertex.id
+      val newConf = vertex.taskConf.withConfig(config(processorId))
+      vertex.copy(taskConf = newConf)
+    }
+    new StreamApplication(app.name, app.inputUserConfig, dag)
+  }
+
+  private def junctionConfig(processorIds: Map[Module, ProcessorId]):
+  Map[ProcessorId, UserConfig] = {
+    val updatedConfigs = graph.vertices.flatMap { vertex =>
+      buildShape(vertex, processorIds)
+    }.toMap
+    updatedConfigs
+  }
+
+  private def buildShape(vertex: Module, processorIds: Map[Module, ProcessorId]):
+  Option[(ProcessorId, UserConfig)] = {
+    def inProcessors(vertex: Module): List[ProcessorId] = {
+      vertex.shape.inlets.flatMap { inlet =>
+        graph.incomingEdgesOf(vertex).find(
+          _._2.to == inlet).map(_._1
+        ).flatMap(processorIds.get)
+      }.toList
+    }
+    def outProcessors(vertex: Module): List[ProcessorId] = {
+      vertex.shape.outlets.flatMap { outlet =>
+        graph.outgoingEdgesOf(vertex).find(
+          _._2.from == outlet).map(_._3
+        ).flatMap(processorIds.get)
+      }.toList
+    }
+    processorIds.get(vertex).map(processorId => {
+      (processorId, UserConfig.empty.
+        withValue(GraphTask.OUT_PROCESSORS, outProcessors(vertex)).
+        withValue(GraphTask.IN_PROCESSORS, inProcessors(vertex)))
+    })
+  }
+
+  private def resolveIds(app: StreamApplication, ids: Map[Module, ID]):
+  Map[Module, ProcessorId] = {
+    ids.flatMap { kv =>
+      val (module, id) = kv
+      val processorId = app.dag.vertices.find { processor =>
+        processor.taskConf.getString(id).isDefined
+      }.map(_.id)
+      processorId.map((module, _))
+    }
+  }
+
+  private def removeIds(app: StreamApplication): StreamApplication = {
+    val graph = app.dag.mapVertex { processor =>
+      val conf = removeId(processor.taskConf)
+      processor.copy(taskConf = conf)
+    }
+    new StreamApplication(app.name, app.inputUserConfig, graph)
+  }
+
+  private def removeId(conf: UserConfig): UserConfig = {
+    conf.filter { kv =>
+      kv._2 != RemoteMaterializerImpl.TRACKABLE
+    }
+  }
+
+  private def toOpGraph: (Graph[Op, OpEdge], Map[Module, ID]) = {
+    var matValues = collection.mutable.Map.empty[Module, ID]
+    val opGraph = graph.mapVertex[Op] { module =>
+      val name = uuid
+      val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.TRACKABLE)
+      matValues += module -> name
+      val parallelism = GearAttributes.count(module.attributes)
+      val op = module match {
+        case source: SourceTaskModule[_] =>
+          val updatedConf = conf.withConfig(source.conf)
+          DataSourceOp(source.source, parallelism, updatedConf, "source")
+        case sink: SinkTaskModule[_] =>
+          val updatedConf = conf.withConfig(sink.conf)
+          DataSinkOp(sink.sink, parallelism, updatedConf, "sink")
+        case sourceBridge: SourceBridgeModule[_, _] =>
+          ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
+        case processor: ProcessorModule[_, _, _] =>
+          val updatedConf = conf.withConfig(processor.conf)
+          ProcessorOp(processor.processor, parallelism, updatedConf, "source")
+        case sinkBridge: SinkBridgeModule[_, _] =>
+          ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
+        case groupBy: GroupByModule[_, _] =>
+          GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindow.apply(1).accumulating),
+            parallelism, "groupBy", conf)
+        case reduce: ReduceModule[_] =>
+          reduceOp(reduce.f, conf)
+        case graphStage: GraphStageModule =>
+          translateGraphStageWithMaterializedValue(graphStage, parallelism, conf)
+        case _ =>
+          null
+      }
+      if (op == null) {
+        throw new UnsupportedOperationException(
+          module.getClass.toString + " is not supported with RemoteMaterializer"
+        )
+      }
+      op
+    }.mapEdge[OpEdge] { (n1, edge, n2) =>
+      n2 match {
+        case chainableOp: ChainableOp[_, _]
+          if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] =>
+          Direct
+        case _ =>
+          Shuffle
+      }
+    }
+    (opGraph, matValues.toMap)
+  }
+
+  private def translateGraphStageWithMaterializedValue(module: GraphStageModule,
+      parallelism: Int, conf: UserConfig): Op = {
+    module.stage match {
+      case tickSource: TickSource[_] =>
+        val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef]
+        val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, tickSource.initialDelay).
+          withValue[FiniteDuration](INTERVAL, tickSource.interval).
+          withValue(TICK, tick)
+        ProcessorOp(classOf[TickSourceTask[_]], parallelism, tiConf, "tickSource")
+      case graphStage: GraphStage[_] =>
+        translateGraphStage(module, parallelism, conf)
+      case headOptionStage: HeadOptionStage[_] =>
+        headOptionOp(headOptionStage, conf)
+      case pushPullGraphStageWithMaterializedValue:
+        PushPullGraphStageWithMaterializedValue[_, _, _, _] =>
+        translateSymbolic(pushPullGraphStageWithMaterializedValue, conf)
+    }
+  }
+
+  private def translateGraphStage(module: GraphStageModule,
+      parallelism: Int, conf: UserConfig): Op = {
+    module.stage match {
+      case balance: Balance[_] =>
+        ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
+      case batch: Batch[_, _] =>
+        val batchConf = conf.withValue[_ => Long](BatchTask.COST, batch.costFn).
+          withLong(BatchTask.MAX, batch.max).
+          withValue[(_, _) => _](BatchTask.AGGREGATE, batch.aggregate).
+          withValue[_ => _](BatchTask.SEED, batch.seed)
+        ProcessorOp(classOf[BatchTask[_, _]],
+          parallelism, batchConf, "batch")
+      case broadcast: Broadcast[_] =>
+        val name = ModuleExtractor.unapply(broadcast).map(_.attributes.nameOrDefault()).get
+        ProcessorOp(classOf[BroadcastTask], parallelism, conf, name)
+      case collect: Collect[_, _] =>
+        collectOp(collect.pf, conf)
+      case concat: Concat[_] =>
+        ProcessorOp(classOf[ConcatTask], parallelism, conf, "concat")
+      case delayInitial: DelayInitial[_] =>
+        val dIConf = conf.withValue[FiniteDuration](
+          DelayInitialTask.DELAY_INITIAL, delayInitial.delay)
+        ProcessorOp(classOf[DelayInitialTask[_]], parallelism, dIConf, "delayInitial")
+      case dropWhile: DropWhile[_] =>
+        dropWhileOp(dropWhile.p, conf)
+      case flattenMerge: FlattenMerge[_, _] =>
+        ProcessorOp(classOf[FlattenMergeTask], parallelism, conf, "flattenMerge")
+      case fold: Fold[_, _] =>
+        val foldConf = conf.withValue(FoldTask.ZERO, fold.zero.asInstanceOf[AnyRef]).
+          withValue(FoldTask.AGGREGATOR, fold.f)
+        ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
+      case groupBy: GroupBy[_, _] =>
+        GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindow.apply(1).accumulating),
+          groupBy.maxSubstreams, "groupBy", conf)
+      case groupedWithin: GroupedWithin[_] =>
+        val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
+          withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
+        ProcessorOp(classOf[GroupedWithinTask[_]], parallelism, diConf, "groupedWithin")
+      case idleInject: IdleInject[_, _] =>
+        // TODO
+        null
+      case idleTimeoutBidi: IdleTimeoutBidi[_, _] =>
+        // TODO
+        null
+      case incomingConnectionStage: IncomingConnectionStage =>
+        // TODO
+        null
+      case interleave: Interleave[_] =>
+        val ilConf = conf.withInt(InterleaveTask.INPUT_PORTS, interleave.inputPorts).
+          withInt(InterleaveTask.SEGMENT_SIZE, interleave.segmentSize)
+        ProcessorOp(classOf[InterleaveTask], parallelism, ilConf, "interleave")
+        null
+      case intersperse: Intersperse[_] =>
+        // TODO
+        null
+      case limitWeighted: LimitWeighted[_] =>
+        // TODO
+        null
+      case map: FMap[_, _] =>
+        mapOp(map.f, conf)
+      case mapAsync: MapAsync[_, _] =>
+        ProcessorOp(classOf[MapAsyncTask[_, _]],
+          mapAsync.parallelism, conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsync.f), "mapAsync")
+      case mapAsyncUnordered: MapAsyncUnordered[_, _] =>
+        ProcessorOp(classOf[MapAsyncTask[_, _]],
+          mapAsyncUnordered.parallelism,
+          conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsyncUnordered.f), "mapAsyncUnordered")
+      case materializedValueSource: MaterializedValueSource[_] =>
+        // TODO
+        null
+      case merge: Merge[_] =>
+        val mergeConf = conf.withBoolean(MergeTask.EAGER_COMPLETE, merge.eagerComplete).
+          withInt(MergeTask.INPUT_PORTS, merge.inputPorts)
+        ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge")
+      case mergePreferred: MergePreferred[_] =>
+        MergeOp("mergePreferred", conf)
+      case mergeSorted: MergeSorted[_] =>
+        MergeOp("mergeSorted", conf)
+      case prefixAndTail: PrefixAndTail[_] =>
+        // TODO
+        null
+      case recover: Recover[_] =>
+        // TODO
+        null
+      case scan: Scan[_, _] =>
+        scanOp(scan.zero, scan.f, conf)
+      case simpleLinearGraphStage: SimpleLinearGraphStage[_] =>
+        translateSimpleLinearGraph(simpleLinearGraphStage, parallelism, conf)
+      case singleSource: SingleSource[_] =>
+        val singleSourceConf = conf.withValue[AnyRef](SingleSourceTask.ELEMENT,
+          singleSource.elem.asInstanceOf[AnyRef])
+        ProcessorOp(classOf[SingleSourceTask[_]], parallelism, singleSourceConf, "singleSource")
+      case split: Split[_] =>
+        // TODO
+        null
+      case statefulMapConcat: StatefulMapConcat[_, _] =>
+        val func = statefulMapConcat.f
+        val statefulMapConf =
+          conf.withValue[() => _ => Iterable[_]](StatefulMapConcatTask.FUNC, func)
+        ProcessorOp(classOf[StatefulMapConcatTask[_, _]], parallelism,
+          statefulMapConf, "statefulMapConcat")
+      case subSink: SubSink[_] =>
+        // TODO
+        null
+      case subSource: SubSource[_] =>
+        // TODO
+        null
+      case unfold: Unfold[_, _] =>
+        // TODO
+        null
+      case unfoldAsync: UnfoldAsync[_, _] =>
+        // TODO
+        null
+      case unzip: Unzip[_, _] =>
+        // ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism,
+        //   conf.withValue(
+        //     Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
+        // TODO
+        null
+      case zip: Zip[_, _] =>
+        zipWithOp(zip.zipper, conf)
+      case zipWith2: ZipWith2[_, _, _] =>
+        ProcessorOp(classOf[Zip2Task[_, _, _]],
+          parallelism,
+          conf.withValue(
+            Zip2Task.ZIP2_FUNCTION, Zip2Task.ZipFunction(zipWith2.zipper)
+          ), "zipWith2")
+    }
+  }
+
+  private def translateSimpleLinearGraph(stage: SimpleLinearGraphStage[_],
+      parallelism: Int, conf: UserConfig): Op = {
+    stage match {
+      case completion: Completion[_] =>
+        // TODO
+        null
+      case delay: Delay[_] =>
+        // TODO
+        null
+      case drop: Drop[_] =>
+        dropOp(drop.count, conf)
+      case dropWithin: DropWithin[_] =>
+        val dropWithinConf =
+          conf.withValue[FiniteDuration](DropWithinTask.TIMEOUT, dropWithin.timeout)
+        ProcessorOp(classOf[DropWithinTask[_]],
+          parallelism, dropWithinConf, "dropWithin")
+      case filter: Filter[_] =>
+        filterOp(filter.p, conf)
+      case idle: Idle[_] =>
+        // TODO
+        null
+      case initial: Initial[_] =>
+        // TODO
+        null
+      case log: Log[_] =>
+        logOp(log.name, log.extract, conf)
+      case reduce: Reduce[_] =>
+        reduceOp(reduce.f, conf)
+      case take: Take[_] =>
+        takeOp(take.count, conf)
+      case takeWhile: TakeWhile[_] =>
+        filterOp(takeWhile.p, conf)
+      case takeWithin: TakeWithin[_] =>
+        val takeWithinConf =
+          conf.withValue[FiniteDuration](TakeWithinTask.TIMEOUT, takeWithin.timeout)
+        ProcessorOp(classOf[TakeWithinTask[_]],
+          parallelism, takeWithinConf, "takeWithin")
+      case throttle: Throttle[_] =>
+        val throttleConf = conf.withInt(ThrottleTask.COST, throttle.cost).
+          withInt(ThrottleTask.MAX_BURST, throttle.maximumBurst).
+          withValue[_ => Int](ThrottleTask.COST_CALC, throttle.costCalculation).
+          withValue[FiniteDuration](ThrottleTask.TIME_PERIOD, throttle.per)
+        ProcessorOp(classOf[ThrottleTask[_]],
+          parallelism, throttleConf, "throttle")
+    }
+  }
+
+  private def translateSymbolic(stage: PushPullGraphStageWithMaterializedValue[_, _, _, _],
+      conf: UserConfig): Op = {
+    stage match {
+      case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _]
+        if symbolicGraphStage.symbolicStage.attributes.equals(
+          Stages.DefaultAttributes.buffer) => {
+            // ignore the buffering operation
+            identity("buffer", conf)
+        }
+    }
+  }
+
+}
+
+object RemoteMaterializerImpl {
+  final val NotApplied: Any => Any = _ => NotApplied
+
+  def collectOp[In, Out](collect: PartialFunction[In, Out], conf: UserConfig): Op = {
+    flatMapOp({ data: In =>
+      collect.applyOrElse(data, NotApplied) match {
+        case NotApplied => None
+        case result: Any => Option(result)
+      }
+    }, "collect", conf)
+  }
+
+  def filterOp[In](filter: In => Boolean, conf: UserConfig): Op = {
+    flatMapOp({ data: In =>
+      if (filter(data)) Option(data) else None
+    }, "filter", conf)
+  }
+
+  def headOptionOp[T](headOptionStage: HeadOptionStage[T], conf: UserConfig): Op = {
+    val promise: Promise[Option[T]] = Promise()
+    flatMapOp({ data: T =>
+      data match {
+        case None =>
+          Some(promise.future.failed)
+        case Some(d) =>
+          promise.future.value
+      }
+    }, "headOption", conf)
+  }
+
+  def reduceOp[T](reduce: (T, T) => T, conf: UserConfig): Op = {
+    var result: Option[T] = None
+    val flatMap = { elem: T =>
+      result match {
+        case None =>
+          result = Some(elem)
+        case Some(r) =>
+          result = Some(reduce(r, elem))
+      }
+      List(result)
+    }
+    flatMapOp(flatMap, "reduce", conf)
+  }
+
+  def zipWithOp[In1, In2](zipWith: (In1, In2) => (In1, In2), conf: UserConfig): Op = {
+    val flatMap = { elem: (In1, In2) =>
+      val (e1, e2) = elem
+      val result: (In1, In2) = zipWith(e1, e2)
+      List(result)
+    }
+    flatMapOp(flatMap, "zipWith", conf)
+  }
+
+  def zipWithOp2[In1, In2, Out](zipWith: (In1, In2) => Out, conf: UserConfig): Op = {
+    val flatMap = { elem: (In1, In2) =>
+      val (e1, e2) = elem
+      val result: Out = zipWith(e1, e2)
+      List(result)
+    }
+    flatMapOp(flatMap, "zipWith", conf)
+  }
+
+  def identity(description: String, conf: UserConfig): Op = {
+    flatMapOp({ data: Any =>
+      List(data)
+    }, description, conf)
+  }
+
+  def mapOp[In, Out](map: In => Out, conf: UserConfig): Op = {
+    val flatMap = (data: In) => List(map(data))
+    flatMapOp (flatMap, conf)
+  }
+
+  def flatMapOp[In, Out](flatMap: In => Iterable[Out], conf: UserConfig): Op = {
+    flatMapOp(flatMap, "flatmap", conf)
+  }
+
+  def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
+      conf: UserConfig): Op = {
+    ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf)
+  }
+
+  def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
+    conf: UserConfig): Op = {
+    var agg = None: Option[Out]
+    val flatMap = {elem: In =>
+      agg = agg match {
+        case None =>
+          Some(seed(elem))
+        case Some(value) =>
+          Some(aggregate(value, elem))
+      }
+      List(agg.get)
+    }
+    flatMapOp (flatMap, "conflate", conf)
+  }
+
+  def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op = {
+    var aggregator: Out = zero
+    val map = { elem: In =>
+      aggregator = fold(aggregator, elem)
+      List(aggregator)
+    }
+    flatMapOp(map, "fold", conf)
+  }
+
+  def groupedOp(count: Int, conf: UserConfig): Op = {
+    var left = count
+    val buf = {
+      val b = Vector.newBuilder[Any]
+      b.sizeHint(count)
+      b
+    }
+
+    val flatMap: Any => Iterable[Any] = {input: Any =>
+      buf += input
+      left -= 1
+      if (left == 0) {
+        val emit = buf.result()
+        buf.clear()
+        left = count
+        Some(emit)
+      } else {
+        None
+      }
+    }
+    flatMapOp(flatMap, conf: UserConfig)
+  }
+
+  def dropOp[T](number: Long, conf: UserConfig): Op = {
+    var left = number
+    val flatMap: T => Iterable[T] = {input: T =>
+      if (left > 0) {
+        left -= 1
+        None
+      } else {
+        Some(input)
+      }
+    }
+    flatMapOp(flatMap, "drop", conf)
+  }
+
+  def dropWhileOp[In](drop: In => Boolean, conf: UserConfig): Op = {
+    flatMapOp({ data: In =>
+      if (drop(data))  None else Option(data)
+    }, "dropWhile", conf)
+  }
+
+  def logOp[T](name: String, extract: T => Any, conf: UserConfig): Op = {
+    val flatMap = {elem: T =>
+      LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
+      List(elem)
+    }
+    flatMapOp(flatMap, "log", conf)
+  }
+
+  def scanOp[In, Out](zero: Out, f: (Out, In) => Out, conf: UserConfig): Op = {
+    var aggregator = zero
+    var pushedZero = false
+
+    val flatMap = {elem: In =>
+      aggregator = f(aggregator, elem)
+
+      if (pushedZero) {
+        pushedZero = true
+        List(zero, aggregator)
+      } else {
+        List(aggregator)
+      }
+    }
+    flatMapOp(flatMap, "scan", conf)
+  }
+
+  def statefulMapOp[In, Out](f: In => Iterable[Out], conf: UserConfig): Op = {
+    flatMapOp ({ data: In =>
+      f(data)
+    }, conf)
+  }
+
+  def takeOp(count: Long, conf: UserConfig): Op = {
+    var left: Long = count
+
+    val filter: Any => Iterable[Any] = {elem: Any =>
+      left -= 1
+      if (left > 0) Some(elem)
+      else if (left == 0) Some(elem)
+      else None
+    }
+    flatMapOp(filter, "take", conf)
+  }
+
+  /**
+   * We use this attribute to track module to Processor
+   *
+   */
+  val TRACKABLE = "track how module is fused to processor"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
new file mode 100644
index 0000000..5b8c71b
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ *
+ *   [[IN]] -> [[BridgeModule]] -> [[OUT]]
+ *                   /
+ *                  /
+ *       out of band data input or output channel [[MAT]]
+ *
+ *
+ * [[BridgeModule]] is used as a bridge between different materializers.
+ * Different [[akka.stream.Materializer]]s can use out of band channel to
+ * exchange messages.
+ *
+ * For example:
+ *
+ *                              Remote Materializer
+ *                         -----------------------------
+ *                         |                            |
+ *                         | BridgeModule -> RemoteSink |
+ *                         |  /                         |
+ *                         --/----------------------------
+ *   Local Materializer     /  out of band channel.
+ *   ----------------------/----
+ *   | Local              /    |
+ *   | Source ->  BridgeModule |
+ *   |                         |
+ *   ---------------------------
+ *
+ *
+ * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
+ * module during materialization.
+ *
+ * However, user can still declare it explicitly. In this case, it means we have a
+ * boundary Source or Sink module which accept out of band channel inputs or
+ * outputs.
+ *
+ * @tparam IN input
+ * @tparam OUT output
+ * @tparam MAT materializer
+ */
+abstract class BridgeModule[IN, OUT, MAT] extends AtomicModule {
+  val inPort = Inlet[IN]("BridgeModule.in")
+  val outPort = Outlet[OUT]("BridgeModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+  } else {
+    this
+  }
+
+  def attributes: Attributes
+  def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
+
+  protected def newInstance: BridgeModule[IN, OUT, MAT]
+  override def carbonCopy: Module = newInstance
+}
+
+
+/**
+ *
+ * Bridge module which accept out of band channel Input
+ * [[org.reactivestreams.Publisher]][IN].
+ *
+ *
+ *         [[SourceBridgeModule]] -> [[OUT]]
+ *                   /|
+ *                  /
+ *       out of band data input [[org.reactivestreams.Publisher]][IN]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]]
+ * @tparam OUT out put data type to next module.
+ */
+class SourceBridgeModule[IN, OUT](val attributes: Attributes =
+    Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] {
+  override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] =
+    new SourceBridgeModule[IN, OUT](attributes)
+
+  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = {
+    new SourceBridgeModule( attributes)
+  }
+}
+
+/**
+ *
+ * Bridge module which accept out of band channel Output
+ * [[org.reactivestreams.Subscriber]][OUT].
+ *
+ *
+ *   [[IN]] -> [[BridgeModule]]
+ *                    \
+ *                     \
+ *                      \|
+ *       out of band data output [[org.reactivestreams.Subscriber]][OUT]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from previous module
+ * @tparam OUT out put data type to out of band subscriber
+ */
+class SinkBridgeModule[IN, OUT](val attributes: Attributes =
+    Attributes.name("sinkBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] {
+  override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] =
+    new SinkBridgeModule[IN, OUT](attributes)
+
+  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = {
+    new SinkBridgeModule[IN, OUT](attributes)
+  }
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
similarity index 91%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
index bc744f9..ea76bb0 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.module
+package org.apache.gearpump.akkastream.module
 
-import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
 import akka.stream.impl.{SinkModule, SourceModule}
 import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape}
 import org.reactivestreams.{Publisher, Subscriber}
@@ -47,7 +47,8 @@
  *
  *
  */
-trait DummyModule extends Module
+trait DummyModule extends AtomicModule
+
 
 /**
  *
@@ -56,9 +57,9 @@
  *                       /
  *       out of band input message Source
  *
- * @param attributes
- * @param shape
- * @tparam Out
+ * @param attributes Attributes
+ * @param shape SourceShape[Out]
+ * @tparam Out Output
  */
 class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out])
   extends SourceModule[Out, Unit](shape) with DummyModule {
@@ -76,6 +77,7 @@
   }
 }
 
+
 /**
  *
  *    Source-> [[BridgeModule]] -> [[DummySink]]
@@ -84,8 +86,8 @@
  *                      \|
  *                   out of band output message [[Subscriber]]
  *
- * @param attributes
- * @param shape
+ * @param attributes Attributes
+ * @param shape SinkShape[IN]
  */
 class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
   extends SinkModule[IN, Unit](shape) with DummyModule {
@@ -100,4 +102,4 @@
   override def withAttributes(attr: Attributes): Module = {
     new DummySink[IN](attr, amendShape(attr))
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
similarity index 67%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
index c4c78cc..dfbbee9 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
@@ -16,12 +16,10 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.module
+package org.apache.gearpump.akkastream.module
 
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape}
-
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import akka.stream._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
@@ -32,17 +30,17 @@
  *
  * This is specially designed for Gearpump runtime. It is not supposed to be used
  * for local materializer.
- *
+ * 
  */
-trait GearpumpTaskModule extends Module
+trait GearpumpTaskModule extends AtomicModule
 
 /**
  * This is used to represent the Gearpump Data Source
- * @param source
- * @param conf
- * @param shape
- * @param attributes
- * @tparam T
+ * @param source DataSource
+ * @param conf UserConfig
+ * @param shape SourceShape[T}
+ * @param attributes Attributes
+ * @tparam T type
  */
 final case class SourceTaskModule[T](
     source: DataSource,
@@ -51,13 +49,10 @@
     attributes: Attributes = Attributes.name("SourceTaskModule"))
   extends GearpumpTaskModule {
 
-  override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = {
+  override def withAttributes(attr: Attributes): Module =
     this.copy(shape = amendShape(attr), attributes = attr)
-  }
-  override def carbonCopy: Module = {
-    this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out")))
-  }
+  override def carbonCopy: Module =
+    this.copy(shape = SourceShape( Outlet[T]("SourceTaskModule.out")))
 
   override def replaceShape(s: Shape): Module =
     if (s == shape) this
@@ -68,17 +63,17 @@
     val thatN = attr.nameOrDefault(null)
 
     if ((thatN eq null) || thisN == thatN) shape
-    else shape.copy(outlet = Outlet(thatN + ".out"))
+    else shape.copy(out = Outlet(thatN + ".out"))
   }
 }
 
 /**
  * This is used to represent the Gearpump Data Sink
- * @param sink
- * @param conf
- * @param shape
- * @param attributes
- * @tparam IN
+ * @param sink DataSink
+ * @param conf UserConfig
+ * @param shape SinkShape[IN]
+ * @param attributes Attributes
+ * @tparam IN type
  */
 final case class SinkTaskModule[IN](
     sink: DataSink,
@@ -87,11 +82,10 @@
     attributes: Attributes = Attributes.name("SinkTaskModule"))
   extends GearpumpTaskModule {
 
-  override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = {
+  override def withAttributes(attr: Attributes): Module =
     this.copy(shape = amendShape(attr), attributes = attr)
-  }
-  override def carbonCopy: Module = this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
+  override def carbonCopy: Module =
+    this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
 
   override def replaceShape(s: Shape): Module =
     if (s == shape) this
@@ -102,32 +96,40 @@
     val thatN = attr.nameOrDefault(null)
 
     if ((thatN eq null) || thisN == thatN) shape
-    else shape.copy(inlet = Inlet(thatN + ".out"))
+    else shape.copy(in = Inlet(thatN + ".out"))
   }
 }
 
 /**
  * This is to represent the Gearpump Processor which has exact one input and one output
- * @param processor
- * @param conf
- * @param attributes
- * @tparam IN
- * @tparam OUT
- * @tparam Unit
+ * @param processor Class[_ <: Task]
+ * @param conf UserConfig
+ * @param attributes Attributes
+ * @tparam IN type
+ * @tparam OUT type
+ * @tparam Unit void
  */
 case class ProcessorModule[IN, OUT, Unit](
     processor: Class[_ <: Task],
     conf: UserConfig,
-    val attributes: Attributes = Attributes.name("processorModule"))
-  extends FlowModule[IN, OUT, Unit] with GearpumpTaskModule {
+    attributes: Attributes = Attributes.name("processorModule"))
+  extends AtomicModule with GearpumpTaskModule {
+  val inPort = Inlet[IN]("ProcessorModule.in")
+  val outPort = Outlet[IN]("ProcessorModule.out")
+  override val shape = new FlowShape(inPort, outPort)
 
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+  } else {
+    this
+  }
+  
   override def carbonCopy: Module = newInstance
 
-  protected def newInstance: ProcessorModule[IN, OUT, Unit] = {
+  protected def newInstance: ProcessorModule[IN, OUT, Unit] =
     new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
-  }
 
   override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = {
     new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
new file mode 100644
index 0000000..b06dd0e
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
+
+/**
+ *
+ * Group the T value groupBy function
+ *
+ * @param groupBy T => Group
+ * @param attributes Attributes
+ * @tparam T type
+ * @tparam Group type
+ */
+case class GroupByModule[T, Group](groupBy: T => Group,
+    attributes: Attributes = Attributes.name("groupByModule"))
+  extends AtomicModule {
+  val inPort = Inlet[T]("GroupByModule.in")
+  val outPort = Outlet[T]("GroupByModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+  } else {
+    this
+  }
+
+  override def carbonCopy: Module = newInstance
+
+  protected def newInstance: GroupByModule[T, Group] =
+    new GroupByModule[T, Group](groupBy, attributes)
+
+  override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = {
+    new GroupByModule[T, Group](groupBy, attributes)
+  }
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
similarity index 62%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
index 926feb6..462d967 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
@@ -16,23 +16,31 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.module
+package org.apache.gearpump.akkastream.module
 
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
 
 /**
  *
  * Reduce Module
  *
- * @param f
- * @param attributes
- * @tparam T
+ * @param f (T,T) => T
+ * @param attributes Attributes
+ * @tparam T type
  */
-case class ReduceModule[T](
-    val f: (T, T) => T, val attributes: Attributes = Attributes.name("reduceModule"))
-  extends FlowModule[T, T, Unit] {
+case class ReduceModule[T](f: (T, T) => T, attributes: Attributes =
+Attributes.name("reduceModule")) extends AtomicModule {
+  val inPort = Inlet[T]("GroupByModule.in")
+  val outPort = Outlet[T]("GroupByModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+  } else {
+    this
+  }
 
   override def carbonCopy: Module = newInstance
 
@@ -41,4 +49,4 @@
   override def withAttributes(attributes: Attributes): ReduceModule[T] = {
     new ReduceModule[T](f, attributes)
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
similarity index 90%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
index 9cc46c9..8e43c16 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
@@ -16,19 +16,19 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.scaladsl
+package org.apache.gearpump.akkastream.scaladsl
 
 import akka.stream.Attributes
-import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
+import org.apache.gearpump.akkastream.module._
 import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
-import org.reactivestreams.{Publisher, Subscriber}
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.Task
+import org.reactivestreams.{Publisher, Subscriber}
 
-object GearSource {
+
+object GearSource{
 
   /**
    * Construct a Source which accepts out of band input messages.
@@ -52,18 +52,18 @@
   /**
    * Construct a Source from Gearpump [[DataSource]].
    *
-   * [[SourceTaskModule]] -> downstream Sink
+   *    [[SourceTaskModule]] -> downstream Sink
    *
    */
   def from[OUT](source: DataSource): Source[OUT, Unit] = {
-    val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty))
+    val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty))
     taskSource
   }
 
   /**
    * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]].
    *
-   * [[ProcessorModule]] -> downstream Sink
+   *    [[ProcessorModule]] -> downstream Sink
    *
    */
   def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = {
@@ -97,7 +97,7 @@
   /**
    * Construct a Sink from Gearpump [[DataSink]].
    *
-   * Upstream Source -> [[SinkTaskModule]]
+   *    Upstream Source -> [[SinkTaskModule]]
    *
    */
   def to[IN](sink: DataSink): Sink[IN, Unit] = {
@@ -108,7 +108,7 @@
   /**
    * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]].
    *
-   * Upstream Source -> [[ProcessorModule]]
+   *    Upstream Source -> [[ProcessorModule]]
    *
    */
   def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
@@ -131,7 +131,7 @@
  *
  * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
  *   Count(kv.key, 1)
- * }.fold(Count(null, 0)){(base, add) =>
+ * }.fold(Count(null, 0)) {(base, add) =>
  *   Count(add.key, base.count + add.count)
  * }.log("count of current key")
  * .flatten()
@@ -146,7 +146,7 @@
  * sink will only operate on the main stream.
  *
  */
-object GroupBy {
+object GroupBy{
   def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
     new Flow[T, T, Unit](new GroupByModule(groupBy))
   }
@@ -159,18 +159,19 @@
  *
  *
  */
-object Reduce {
+object Reduce{
   def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
     new Flow[T, T, Unit](new ReduceModule(reduce))
   }
 }
 
+
 /**
  * Create a Flow by providing a Gearpump Processor class and configuration
  *
  *
  */
-object Processor {
+object Processor{
   def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = {
     new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf))
   }
@@ -183,7 +184,7 @@
    */
   implicit class SourceOps[T, Mat](source: Source[T, Mat]) {
 
-    //TODO It is named as groupBy2 to avoid conflict with built-in
+    // TODO It is named as groupBy2 to avoid conflict with built-in
     // groupBy. Eventually, we think the built-in groupBy should
     // be replace with this implementation.
     def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
@@ -191,6 +192,7 @@
       source.via[T, Unit](stage)
     }
 
+
     def reduce(reduce: (T, T) => T): Source[T, Mat] = {
       val stage = Reduce.apply(reduce)
       source.via[T, Unit](stage)
@@ -237,10 +239,13 @@
     }
 
     /**
-     * Does sum on values
+     * do sum on values
      *
      * Before doing this, you need to do groupByKey to group same key together
      * , otherwise, it will do the sum no matter what current key is.
+     *
+     * @param numeric Numeric[V]
+     * @return
      */
     def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
       val stage = Reduce.apply(sumByKey[K, V](numeric))
@@ -249,13 +254,13 @@
   }
 
   /**
-   * Helper util to support groupByKey and sum
+   * Help util to support groupByKey and sum
    */
   implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
 
     /**
-     * If it is a KV Pair, we can group the KV pair by the key.
-     *
+     * if it is a KV Pair, we can group the KV pair by the key.
+     * @return
      */
     def groupByKey: Flow[(K, V), (K, V), Mat] = {
       val stage = GroupBy.apply(getTupleKey[K, V])
@@ -268,6 +273,8 @@
      * Before doing this, you need to do groupByKey to group same key together
      * , otherwise, it will do the sum no matter what current key is.
      *
+     * @param numeric Numeric[V]
+     * @return
      */
     def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
       val stage = Reduce.apply(sumByKey[K, V](numeric))
@@ -279,4 +286,4 @@
 
   private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] =
     (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
similarity index 84%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
index 2eb0612..43f07c4 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
@@ -16,22 +16,23 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class BalanceTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
 
   val sizeOfOutputs = sizeOfOutPorts
   var index = 0
 
-  override def onNext(msg: Message): Unit = {
+  override def onNext(msg : Message) : Unit = {
     output(index, msg)
     index += 1
     if (index == sizeOfOutputs) {
       index = 0
     }
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
new file mode 100644
index 0000000..5c2485b
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val max = userConf.getLong(BatchTask.MAX)
+  val costFunc = userConf.getValue[In => Long](BatchTask.COST)
+  val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE)
+  val seed = userConf.getValue[In => Out](BatchTask.SEED)
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    context.output(msg)
+  }
+}
+
+object BatchTask {
+  val AGGREGATE = "AGGREGATE"
+  val COST = "COST"
+  val MAX = "MAX"
+  val SEED = "SEED"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
similarity index 82%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
index 925bf21..292468d 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
@@ -16,14 +16,15 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
-  override def onNext(msg: Message): Unit = {
+class BroadcastTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+  override def onNext(msg : Message) : Unit = {
     context.output(msg)
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
similarity index 84%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
index 2eb0612..b77b9bd 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
@@ -16,22 +16,23 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class ConcatTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
 
   val sizeOfOutputs = sizeOfOutPorts
   var index = 0
 
-  override def onNext(msg: Message): Unit = {
+  override def onNext(msg : Message) : Unit = {
     output(index, msg)
     index += 1
     if (index == sizeOfOutputs) {
       index = 0
     }
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
new file mode 100644
index 0000000..7c335dc
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+case object DelayInitialTime
+
+class DelayInitialTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val delayInitial = userConf.getValue[FiniteDuration](DelayInitialTask.DELAY_INITIAL).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var delayInitialActive = true
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(delayInitial)(
+      self ! Message(DelayInitialTime, System.currentTimeMillis())
+    )
+  }
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DelayInitialTime =>
+        delayInitialActive = false
+      case _ =>
+        delayInitialActive match {
+          case true =>
+          case false =>
+            context.output(msg)
+        }
+    }
+  }
+}
+
+object DelayInitialTask {
+  val DELAY_INITIAL = "DELAY_INITIAL"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
new file mode 100644
index 0000000..0c54829
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object DropWithinTimeout
+
+class DropWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val timeout = userConf.getValue[FiniteDuration](DropWithinTask.TIMEOUT).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var timeoutActive = true
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(timeout)(
+      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DropWithinTimeout =>
+        timeoutActive = false
+      case _ =>
+
+    }
+    timeoutActive match {
+      case true =>
+      case false =>
+        context.output(msg)
+    }
+  }
+}
+
+object DropWithinTask {
+  val TIMEOUT = "TIMEOUT"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
similarity index 84%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
index 2eb0612..14ff537 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
@@ -16,22 +16,23 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class FlattenMergeTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
 
   val sizeOfOutputs = sizeOfOutPorts
   var index = 0
 
-  override def onNext(msg: Message): Unit = {
+  override def onNext(msg : Message) : Unit = {
     output(index, msg)
     index += 1
     if (index == sizeOfOutputs) {
       index = 0
     }
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
new file mode 100644
index 0000000..d982ebd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class FoldTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val zero = userConf.getValue[Out](FoldTask.ZERO)
+  val aggregator = userConf.getValue[(Out, In) => Out](FoldTask.AGGREGATOR)
+  var aggregated: Out = _
+  implicit val ec = context.system.dispatcher
+
+  override def onStart(instant: Instant): Unit = {
+    zero.foreach(value => {
+      aggregated = value
+    })
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    aggregator.foreach(func => {
+      aggregated = func(aggregated, data)
+      LOG.info(s"aggregated = $aggregated")
+      val msg = new Message(aggregated, time)
+      context.output(msg)
+    })
+  }
+}
+
+object FoldTask {
+  val ZERO = "ZERO"
+  val AGGREGATOR = "AGGREGATOR"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
similarity index 65%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
index 9a4e24e..3310ab9 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
@@ -16,35 +16,36 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
-import akka.stream.gearpump.task.GraphTask.{Index, PortId}
+import java.time.Instant
 
 import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.GraphTask.{Index, PortId}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskWrapper}
 
-class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig)
+class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig)
   extends Task(inputTaskContext, userConf) {
 
   private val context = inputTaskContext.asInstanceOf[TaskWrapper]
-  private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]](
-    GraphTask.OUT_PROCESSORS).get)
-  private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]](
-    GraphTask.IN_PROCESSORS).get)
+  protected val outMapping =
+    portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get)
+  protected val inMapping =
+    portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get)
 
   val sizeOfOutPorts = outMapping.keys.size
   val sizeOfInPorts = inMapping.keys.size
-
+  
   private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = {
-    val portToProcessor = processors.zipWithIndex.map { kv =>
+    val portToProcessor = processors.zipWithIndex.map{kv =>
       (kv._2, kv._1)
     }.toMap
 
     val processorToIndex = processors.sorted.zipWithIndex.toMap
 
-    val portToIndex = portToProcessor.map { kv =>
+    val portToIndex = portToProcessor.map{kv =>
       val (outlet, processorId) = kv
       val index = processorToIndex(processorId)
       (outlet, index)
@@ -56,15 +57,15 @@
     context.output(outMapping(outletId), msg)
   }
 
-  override def onStart(startTime: StartTime): Unit = {}
+  override def onStart(startTime : Instant) : Unit = {}
 
-  override def onStop(): Unit = {}
+  override def onStop() : Unit = {}
 }
 
 object GraphTask {
-  val OUT_PROCESSORS = "akka.stream.gearpump.task.outprocessors"
-  val IN_PROCESSORS = "akka.stream.gearpump.task.inprocessors"
+  val OUT_PROCESSORS = "org.apache.gearpump.akkastream.task.outprocessors"
+  val IN_PROCESSORS = "org.apache.gearpump.akkastream.task.inprocessors"
 
   type PortId = Int
   type Index = Int
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
new file mode 100644
index 0000000..eaf2b3f
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.collection.immutable.VectorBuilder
+import scala.concurrent.duration.FiniteDuration
+
+class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  case object GroupedWithinTrigger
+  val buf: VectorBuilder[T] = new VectorBuilder
+  val timeWindow = userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW)
+  val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE)
+
+  override def onNext(msg : Message) : Unit = {
+
+  }
+}
+
+object GroupedWithinTask {
+  val BATCH_SIZE = "BATCH_SIZE"
+  val TIME_WINDOW = "TIME_WINDOW"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
similarity index 71%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
index 2eb0612..741ec43 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
@@ -16,22 +16,29 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class InterleaveTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
 
-  val sizeOfOutputs = sizeOfOutPorts
+  val sizeOfInputs = sizeOfInPorts
   var index = 0
 
-  override def onNext(msg: Message): Unit = {
+  // TODO access upstream and pull
+  override def onNext(msg : Message) : Unit = {
     output(index, msg)
     index += 1
-    if (index == sizeOfOutputs) {
+    if (index == sizeOfInputs) {
       index = 0
     }
   }
-}
\ No newline at end of file
+}
+
+object InterleaveTask {
+  val INPUT_PORTS = "INPUT_PORTS"
+  val SEGMENT_SIZE = "SEGMENT_SIZE"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
new file mode 100644
index 0000000..daa1afc
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+
+class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC)
+  implicit val ec = context.system.dispatcher
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    f match {
+      case Some(func) =>
+        val fout = func(data)
+        fout.onComplete(value => {
+          value.foreach(out => {
+            val msg = new Message(out, time)
+            context.output(msg)
+          })
+        })
+      case None =>
+    }
+  }
+}
+
+object MapAsyncTask {
+  val MAPASYNC_FUNC = "MAPASYNC_FUNC"
+
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
similarity index 69%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
index 925bf21..ad18f72 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
@@ -16,14 +16,24 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
-  override def onNext(msg: Message): Unit = {
+class MergeTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE)
+  val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS)
+
+  override def onNext(msg : Message) : Unit = {
     context.output(msg)
   }
-}
\ No newline at end of file
+}
+
+object MergeTask {
+  val EAGER_COMPLETE = "EAGER_COMPLETE"
+  val INPUT_PORTS = "INPUT_PORTS"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
similarity index 65%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
index 2eb0612..458bb4e 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -16,22 +16,28 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+import scala.concurrent.duration.FiniteDuration
 
-  val sizeOfOutputs = sizeOfOutPorts
-  var index = 0
+class SingleSourceTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
 
-  override def onNext(msg: Message): Unit = {
-    output(index, msg)
-    index += 1
-    if (index == sizeOfOutputs) {
-      index = 0
-    }
+  val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(Message(elem, msg.timestamp))
   }
-}
\ No newline at end of file
+}
+
+object SingleSourceTask {
+  val ELEMENT = "ELEMENT"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
similarity index 78%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
index b681852..1b9c4e3 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
@@ -16,24 +16,24 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
+import java.time.Instant
 import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor.Actor.Receive
 import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage
 import akka.util.Timeout
-import org.reactivestreams.{Publisher, Subscriber, Subscription}
-
 import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.RequestMessage
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
 import org.apache.gearpump.util.LogUtil
+import org.reactivestreams.{Publisher, Subscriber, Subscription}
 
 /**
  * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module
@@ -46,23 +46,27 @@
  *                            \|
  *                       Akka Stream [[Subscriber]]
  *
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
  */
-class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
+class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+  extends Task(taskContext, userConf) {
   import taskContext.taskId
 
   val queue = new util.LinkedList[Message]()
-  var subscriber: ActorRef = null
+  var subscriber: ActorRef = _
 
   var request: Int = 0
 
-  override def onStart(startTime: StartTime): Unit = {}
+  override def onStart(startTime : Instant) : Unit = {}
 
-  override def onNext(msg: Message): Unit = {
+  override def onNext(msg : Message) : Unit = {
     queue.add(msg)
     trySendingData()
   }
 
-  override def onStop(): Unit = {}
+  override def onStop() : Unit = {}
 
   private def trySendingData(): Unit = {
     if (subscriber != null) {
@@ -88,15 +92,16 @@
 
   case class RequestMessage(number: Int)
 
-  class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int, processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
+  class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int,
+      processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
     private val taskId = TaskId(processorId, index = 0)
-
     private val LOG = LogUtil.getLogger(getClass)
 
-    private var actor: ActorRef = null
+    private var actor: ActorRef = _
     import system.dispatcher
 
-    private val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
+    private val task =
+      context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map{container =>
       // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
       container.task
     }
@@ -111,7 +116,7 @@
     private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
 
     override def request(l: Long): Unit = {
-      task.foreach { task =>
+      task.foreach{ task =>
         task.tell(RequestMessage(l.toInt), actor)
       }
     }
@@ -119,7 +124,8 @@
 
   class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor {
     def receive: Receive = {
-      case result: AnyRef => subscriber.onNext(result)
+      case result: AnyRef =>
+        subscriber.onNext(result)
     }
   }
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
similarity index 71%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
index ccbd350..054b483 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -16,20 +16,21 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
-import scala.concurrent.ExecutionContext
+import java.time.Instant
 
 import akka.actor.Actor.Receive
-import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error}
-import org.reactivestreams.{Subscriber, Subscription}
-
 import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
+import org.reactivestreams.{Subscriber, Subscription}
+
+import scala.concurrent.ExecutionContext
 
 /**
  * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task
@@ -42,44 +43,51 @@
  *               /                    Local JVM
  *    Akka Stream [[org.reactivestreams.Publisher]]
  *
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
  */
-class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
+class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+  extends Task(taskContext, userConf) {
   import taskContext.taskId
 
-  override def onStart(startTime: StartTime): Unit = {}
+  override def onStart(startTime : Instant) : Unit = {}
 
-  override def onNext(msg: Message): Unit = {
+  override def onNext(msg : Message) : Unit = {
     LOG.info("AkkaStreamSource receiving message " + msg)
   }
 
-  override def onStop(): Unit = {}
+  override def onStop() : Unit = {}
 
   override def receiveUnManagedMessage: Receive = {
     case Error(ex) =>
       LOG.error("the stream has error", ex)
     case AkkaStreamMessage(msg) =>
-      LOG.error("we have received message from akka stream source: " + msg)
+      LOG.info("we have received message from akka stream source: " + msg)
       taskContext.output(Message(msg, System.currentTimeMillis()))
     case Complete(description) =>
-      LOG.error("the stream is completed: " + description)
+      LOG.info("the stream is completed: " + description)
     case msg =>
       LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
   }
 }
 
+
 object SourceBridgeTask {
   case class Error(ex: java.lang.Throwable)
 
   case class Complete(description: String)
 
-  case class AkkaStreamMessage(msg: AnyRef)
+  case class AkkaStreamMessage[T >: AnyRef](msg: T)
 
-  class SourceBridgeTaskClient[T <: AnyRef](ec: ExecutionContext, context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] {
+  class SourceBridgeTaskClient[T >: AnyRef](ec: ExecutionContext,
+      context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] {
     val taskId = TaskId(processorId, 0)
-    var subscription: Subscription = null
+    var subscription: Subscription = _
     implicit val dispatcher = ec
 
-    val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
+    val task = context.askAppMaster[TaskActorRef](appId,
+      LookupTaskActorRef(taskId)).map{container =>
       // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
       container.task
     }
@@ -89,6 +97,7 @@
     }
 
     override def onSubscribe(subscription: Subscription): Unit = {
+      // when taskActorRef is resolved, request message from upstream
       this.subscription = subscription
       task.map(task => subscription.request(1))
     }
@@ -98,7 +107,7 @@
     }
 
     override def onNext(t: T): Unit = {
-      task.map { task =>
+      task.map {task =>
         task ! AkkaStreamMessage(t)
       }
       subscription.request(1)
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
new file mode 100644
index 0000000..a0674bc
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val func = userConf.getValue[() => IN => Iterable[OUT]](StatefulMapConcatTask.FUNC).get
+  var f: IN => Iterable[OUT] = _
+
+  override def onStart(startTime: Instant) : Unit = {
+    f = func()
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    val in: IN = msg.msg.asInstanceOf[IN]
+    val out: Iterable[OUT] = f(in)
+    val iterator = out.iterator
+    while(iterator.hasNext) {
+      val nextValue = iterator.next
+      context.output(Message(nextValue, System.currentTimeMillis()))
+    }
+  }
+}
+
+object StatefulMapConcatTask {
+  val FUNC = "FUNC"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
new file mode 100644
index 0000000..9559d8f
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object TakeWithinTimeout
+
+class TakeWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val timeout = userConf.getValue[FiniteDuration](TakeWithinTask.TIMEOUT).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var timeoutActive = false
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(timeout)(
+      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DropWithinTimeout =>
+        timeoutActive = true
+      case _ =>
+
+    }
+    timeoutActive match {
+      case true =>
+      case false =>
+        context.output(msg)
+    }
+  }
+}
+
+object TakeWithinTask {
+  val TIMEOUT = "TIMEOUT"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
new file mode 100644
index 0000000..3c7ad87
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val cost = userConf.getInt(ThrottleTask.COST).getOrElse(0)
+  val costCalc = userConf.getValue[T => Int](ThrottleTask.COST_CALC)
+  val maxBurst = userConf.getInt(ThrottleTask.MAX_BURST)
+  val timePeriod = userConf.getValue[FiniteDuration](ThrottleTask.TIME_PERIOD).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  val interval = timePeriod.toNanos / cost
+
+  // TODO control rate from TaskActor
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[T]
+    val time = msg.timestamp
+    context.output(msg)
+  }
+}
+
+object ThrottleTask {
+  val COST = "COST"
+  val COST_CALC = "COST_CAL"
+  val MAX_BURST = "MAX_BURST"
+  val TIME_PERIOD = "TIME_PERIOD"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
new file mode 100644
index 0000000..d99d2db
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val initialDelay = userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  (TickSourceTask.INITIAL_DELAY)
+  val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  val tick = userConf.getValue[T](TickSourceTask.TICK).get
+
+  override def onStart(startTime: Instant): Unit = {
+    context.schedule(initialDelay, interval)(
+      self ! Message(tick, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(msg)
+  }
+}
+
+object TickSourceTask {
+  val INITIAL_DELAY = "INITIAL_DELAY"
+  val INTERVAL = "INTERVAL"
+  val TICK = "TICK"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
similarity index 64%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
index 78fabbe..005d018 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -16,30 +16,31 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
 
-import akka.stream.gearpump.task.UnZip2Task.UnZipFunction
-
+import org.apache.gearpump.akkastream.task.Unzip2Task.UnZipFunction
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-class UnZip2Task(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
 
-  val unzip = userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
+  val unzip = userConf.
+    getValue[UnZipFunction[In, A1, A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
 
-  override def onNext(msg: Message): Unit = {
+  override def onNext(msg : Message) : Unit = {
     val message = msg.msg
     val time = msg.timestamp
-    val pair = unzip(message)
+    val pair = unzip(message.asInstanceOf[In])
     val (a, b) = pair
     output(0, Message(a.asInstanceOf[AnyRef], time))
     output(1, Message(b.asInstanceOf[AnyRef], time))
   }
 }
 
-object UnZip2Task {
-  class UnZipFunction(val unzip: Any => (Any, Any)) extends Serializable
+object Unzip2Task {
+  case class UnZipFunction[In, A1, A2](unzip: In => (A1, A2)) extends Serializable
 
-  val UNZIP2_FUNCTION = "akka.stream.gearpump.task.unzip2.function"
-}
\ No newline at end of file
+  val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
new file mode 100644
index 0000000..7e0c082
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.Zip2Task.ZipFunction
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val zip = userConf.
+    getValue[ZipFunction[A1, A2, OUT]](Zip2Task.ZIP2_FUNCTION)(context.system).get.zip
+  var a1: Option[A1] = None
+  var a2: Option[A2] = None
+
+  override def onNext(msg : Message) : Unit = {
+    val message = msg.msg
+    val time = msg.timestamp
+    a1 match {
+      case Some(x) =>
+        a2 = Some(message.asInstanceOf[A2])
+        a1.foreach(v1 => {
+          a2.foreach(v2 => {
+            val out = zip(v1, v2)
+            context.output(Message(out.asInstanceOf[OUT], time))
+
+          })
+        })
+      case None =>
+        a1 = Some(message.asInstanceOf[A1])
+    }
+  }
+}
+
+object Zip2Task {
+  case class ZipFunction[A1, A2, OUT](val zip: (A1, A2) => OUT) extends Serializable
+
+  val ZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.zip2.function"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
similarity index 78%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
index c774fc7..6ad90df 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
@@ -16,15 +16,17 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump.util
+package org.apache.gearpump.akkastream.util
 
 import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, MaterializedValueNode, Module, Transform}
 
 class MaterializedValueOps(mat: MaterializedValueNode) {
-  def resolve[Mat](materializedValues: Map[Module, Any]): Mat = {
-    def resolveMaterialized(mat: MaterializedValueNode, materializedValues: Map[Module, Any]): Any = mat match {
+  def resolve[Mat](materializedValues: scala.collection.mutable.Map[Module, Any]): Mat = {
+    def resolveMaterialized(mat: MaterializedValueNode,
+        materializedValues: scala.collection.mutable.Map[Module, Any]): Any = mat match {
       case Atomic(m) => materializedValues.getOrElse(m, ())
-      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
+      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+        resolveMaterialized(d2, materializedValues))
       case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
       case Ignore => ()
     }
@@ -32,7 +34,7 @@
   }
 }
 
-object MaterializedValueOps {
+object MaterializedValueOps{
   def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat)
 }
 
diff --git a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
similarity index 96%
rename from experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
rename to experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
index 4ead839..e1846ea 100644
--- a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
+++ b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package akka.stream.gearpump
+package org.apache.gearpump.akkastream
 
 import akka.stream.Attributes
 import org.scalatest.{FlatSpec, Matchers}
@@ -30,4 +30,5 @@
 
     assert("aa-bb" == c.nameOrDefault())
   }
+
 }
diff --git a/project/BuildDashboard.scala b/project/BuildDashboard.scala
index c14b9d6..cfa6aae 100644
--- a/project/BuildDashboard.scala
+++ b/project/BuildDashboard.scala
@@ -46,11 +46,11 @@
 
   private lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
     libraryDependencies ++= Seq(
-      "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test",
+      "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % "test",
       "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
       "com.lihaoyi" %% "upickle" % upickleVersion,
-      "com.softwaremill.akka-http-session" %% "core" % "0.2.5",
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
+      "com.softwaremill.akka-http-session" %% "core" % "0.3.0",
+      "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
       "com.github.scribejava" % "scribejava-apis" % "2.4.0",
       "com.ning" % "async-http-client" % "1.9.33",
       "org.webjars" % "angularjs" % "1.4.9",
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index e07b688..eb5f9e1 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -25,7 +25,7 @@
 object BuildExperiments extends sbt.Build {
 
   lazy val experiments: Seq[ProjectReference] = Seq(
-    // akkastream,
+    akkastream,
     cgroup,
     redis,
     storm,
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 6949497..4e30d3f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -23,7 +23,8 @@
 
   val crossScalaVersionNumbers = Seq("2.11.8")
   val scalaVersionNumber = crossScalaVersionNumbers.last
-  val akkaVersion = "2.4.3"
+  val akkaVersion = "2.4.16"
+  val akkaHttpVersion = "10.0.1"
   val hadoopVersion = "2.6.0"
   val hbaseVersion = "1.0.0"
   val commonsHttpVersion = "3.1"
@@ -82,10 +83,9 @@
       "com.typesafe.akka" %% "akka-agent" % akkaVersion,
       "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
       "com.typesafe.akka" %% "akka-kernel" % akkaVersion,
-      "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
+      "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
+      "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
       "org.scala-lang" % "scala-reflect" % scalaVersionNumber,
-      "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
       "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion,
       "com.google.guava" % "guava" % guavaVersion,
       "com.codahale.metrics" % "metrics-graphite" % codahaleVersion
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
index 3088a39..53ee692 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -149,7 +149,7 @@
             }
           }
       } ~
-      path("metrics" / RestPath) { path =>
+      path("metrics" / RemainingPath) { path =>
         parameterMap { optionMap =>
           parameter("aggregator" ? "") { aggregator =>
             parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index ed15121..be96577 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -102,7 +102,7 @@
           failWith(ex)
       }
     } ~
-    path("metrics" / RestPath) { path =>
+    path("metrics" / RemainingPath) { path =>
       parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String =>
         val query = QueryHistoryMetrics(path.head.toString, readOption)
         onComplete(askActor[HistoryMetrics](master, query)) {
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
index 804b34f..4989364 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
@@ -60,8 +60,8 @@
 class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService {
 
   // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box.
-  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump",
-    params = Map.empty)
+  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = Some("gearpump"),
+    params = Map.empty[String, String])
 
   val LOG = LogUtil.getLogger(getClass, "AUDIT")
 
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
index 284d3f2..7b33987 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -19,10 +19,12 @@
 package org.apache.gearpump.services
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.marshalling.ToResponseMarshallable
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.marshalling.ToResponseMarshallable._
+import akka.http.scaladsl.server.{RejectionHandler, StandardRoute}
 import akka.stream.Materializer
-
 import org.apache.gearpump.util.Util
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -56,14 +58,14 @@
       getFromResource("index.html")
     } ~
     path("favicon.ico") {
-      complete(StatusCodes.NotFound)
+      complete(ToResponseMarshallable(StatusCodes.NotFound))
     } ~
     pathPrefix("webjars") {
       get {
         getFromResourceDirectory("META-INF/resources/webjars")
       }
     } ~
-    path(Rest) { path =>
+    path(Remaining) { path =>
       getFromResource("%s" format path)
     }
   }
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
index 8268d61..954fe97 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
@@ -63,7 +63,7 @@
           failWith(ex)
       }
     } ~
-    path("metrics" / RestPath ) { path =>
+    path("metrics" / RemainingPath ) { path =>
       val workerId = WorkerId.parse(workerIdString)
       parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
         val query = QueryHistoryMetrics(path.head.toString, readOption)
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index ca8d89e..d4b3719 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@
  */
 class StreamApplication(
     override val name: String, val inputUserConfig: UserConfig,
-    dag: Graph[ProcessorDescription, PartitionerDescription])
+    val dag: Graph[ProcessorDescription, PartitionerDescription])
   extends Application {
 
   require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 82ea7c7..5aaf2fa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -124,12 +124,11 @@
  * to another Op to be used
  */
 case class ChainableOp[IN, OUT](
-    fn: SingleInputFunction[IN, OUT]) extends Op {
+    fn: SingleInputFunction[IN, OUT],
+    userConfig: UserConfig = UserConfig.empty) extends Op {
 
   override def description: String = fn.description
 
-  override def userConfig: UserConfig = UserConfig.empty
-
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
       case op: ChainableOp[OUT, _] =>