[GERAPUMP-22] Merge akka-streams branch into master
Author: manuzhang <owenzhang1990@gmail.com>
Author: Kam Kasravi <kamkasravi@yahoo.com>
Closes #137 from manuzhang/akka-streams.
diff --git a/experiments/akkastream/README.md b/experiments/akkastream/README.md
index 7c9a316..fe04554 100644
--- a/experiments/akkastream/README.md
+++ b/experiments/akkastream/README.md
@@ -1,4 +1,2 @@
Akka Stream
-=========
-
-TODO: This directory is obsolte. Working on updating it to Akka 2.4.3.
+=========
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf
index 626a1dc..8584511 100644
--- a/experiments/akkastream/src/main/resources/geardefault.conf
+++ b/experiments/akkastream/src/main/resources/geardefault.conf
@@ -2,4 +2,4 @@
"akka.stream.gearpump.example.WikipediaApp$WikidataElement" = ""
"scala.collection.immutable.Map$Map1" = ""
"scala.collection.immutable.Map$Map2" = ""
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
deleted file mode 100644
index d2b328d..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream
-
-import scala.concurrent.ExecutionContextExecutor
-
-/**
- * [[BaseMaterializer]] is a extension to [[akka.stream.Materializer]].
- *
- * Compared with [[akka.stream.Materializer]], the difference is that
- * [[materialize]] accepts a [[ModuleGraph]] instead of a RunnableGraph.
- *
- * @see [[ModuleGraph]] for the difference between RunnableGraph and
- * [[ModuleGraph]]
- *
- */
-abstract class BaseMaterializer extends akka.stream.Materializer {
-
- override def withNamePrefix(name: String): Materializer = throw new UnsupportedOperationException()
-
- override implicit def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException()
-
- def materialize[Mat](graph: ModuleGraph[Mat]): Mat
-
- override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
- val graph = ModuleGraph(runnableGraph)
- materialize(graph)
- }
-
- def shutdown(): Unit
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
deleted file mode 100644
index 48d06f7..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This code is similar to MaterializerSession and will be deprecated in the next release.
- */
-
-package akka.stream
-
-import scala.collection.mutable
-
-import akka.stream.Attributes.Attribute
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.StreamLayout._
-import akka.stream.impl._
-import akka.stream.{Graph => AkkaGraph}
-
-import _root_.org.apache.gearpump.util
-import _root_.org.apache.gearpump.util.Graph
-
-/**
- *
- * ModuleGraph is a transformation on [[akka.stream.scaladsl.RunnableGraph]].
- * It carries all the information of [[akka.stream.scaladsl.RunnableGraph]], but
- * represents it in a different way.
- *
- * Here is the difference:
- *
- * RunnableGraph
- * ==============================
- * [[akka.stream.scaladsl.RunnableGraph]] is represented as a [[Module]] tree:
- * TopLevelModule
- * |
- * -------------------
- * | |
- * SubModule1 SubModule2
- * | |
- * ---------------- ----------
- * | | |
- * AtomicModule1 AtomicModule2 AtomicModule3
- *
- * ModuleGraph
- * ==============================
- * [[ModuleGraph]] is represented as a [[util.Graph]] of Atomic [[Module]]:
- *
- * AtomicModule2 -> AtomicModule4
- * /| \
- * / \
- * / \|
- * AtomicModule1 AtomicModule5
- * \ /|
- * \ /
- * \| /
- * AtomicModule3
- *
- * Each vertex in the Graph is a [[Module]], each [[Edge]] in the Graph is a tuple
- * ([[OutPort]], [[InPort]]). [[OutPort]] is one of upstream Atomic Module
- * output ports. [[InPort]] is one of downstream Atomic Module input ports.
- *
- *
- * Why use [[ModuleGraph]] instead of [[akka.stream.scaladsl.RunnableGraph]]?
- * =========================
- * There are several good reasons:):
- * 1. [[ModuleGraph]] outlines explicitly the upstream/downstream relation.
- * Each [[Edge]] of [[ModuleGraph]] represent a upstream/downstream pair.
- * It is easier for user to understand the overall data flow.
- *
- * 2. It is easier for performance optimization.
- * For the above Graph, if we want to fuse AtomicModule2 and AtomicModule3
- * together, it can be done within [[ModuleGraph]]. We only need
- * to substitute Pair(AtomicModule2, AtomicModule4) with a unified Module.
- *
- * 3. It avoids module duplication.
- * In [[akka.stream.scaladsl.RunnableGraph]], composite Module can be re-used.
- * It is possible that there can be duplicate Modules.
- * The duplication problem causes big headache when doing materialization.
- *
- * [[ModuleGraph]] doesn't have thjis problem. [[ModuleGraph]] does a transformation on the Module
- * Tree to make sure each Atomic Module [[ModuleGraph]] is unique.
- *
- *
- * @param graph a Graph of Atomic modules.
- * @param mat is a function of:
- * input => materialized value of each Atomic module
- * output => final materialized value.
- * @tparam Mat
- */
-class ModuleGraph[Mat](val graph: util.Graph[Module, Edge], val mat: MaterializedValueNode) {
-
- def resolve(materializedValues: Map[Module, Any]): Mat = {
- MaterializedValueOps(mat).resolve[Mat](materializedValues)
- }
-}
-
-object ModuleGraph {
-
- def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] = {
- val topLevel = runnableGraph.module
- val factory = new ModuleGraphFactory(topLevel)
- val (graph, mat) = factory.create()
- new ModuleGraph(graph, mat)
- }
-
- /**
- *
- * @param from outport of upstream module
- * @param to inport of downstream module
- */
- case class Edge(from: OutPort, to: InPort)
-
- private class ModuleGraphFactory(val topLevel: StreamLayout.Module) {
-
- private var subscribersStack: List[mutable.Map[InPort, (InPort, Module)]] =
- mutable.Map.empty[InPort, (InPort, Module)].withDefaultValue(null) :: Nil
- private var publishersStack: List[mutable.Map[OutPort, (OutPort, Module)]] =
- mutable.Map.empty[OutPort, (OutPort, Module)].withDefaultValue(null) :: Nil
-
- /*
- * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule
- * itself. The reason is that the CopiedModule itself is only needed for the enterScope and exitScope methods but
- * not elsewhere. For this reason they are just simply passed as parameters to those methods.
- *
- * The reason why the encapsulated (copied) modules are stored as mutable state to save subclasses of this class
- * from passing the current scope around or even knowing about it.
- */
- private var moduleStack: List[Module] = topLevel :: Nil
-
- private def subscribers: mutable.Map[InPort, (InPort, Module)] = subscribersStack.head
- private def publishers: mutable.Map[OutPort, (OutPort, Module)] = publishersStack.head
- private def currentLayout: Module = moduleStack.head
-
- private val graph = Graph.empty[Module, Edge]
-
- private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = {
- val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
- module.withAttributes(currentAttributes).asInstanceOf[T]
- }
-
- private def materializeAtomic(atomic: Module, parentAttributes: Attributes): MaterializedValueNode = {
- val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
- val copied = copyAtomicModule(atomic, parentAttributes)
-
- for ((in, id) <- inputs.zipWithIndex) {
- val inPort = inPortMapping(atomic, copied)(in)
- assignPort(in, (inPort, copied))
- }
-
- for ((out, id) <- outputs.zipWithIndex) {
- val outPort = outPortMapping(atomic, copied)(out)
- assignPort(out, (outPort, copied))
- }
-
- graph.addVertex(copied)
- Atomic(copied)
- }
-
- def create(): (util.Graph[Module, Edge], MaterializedValueNode) = {
- val mat = materializeModule(topLevel, Attributes.none)
- (graph, mat)
- }
-
- private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = {
- from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
- }
-
- private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
- from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
- }
-
- private def materializeModule(module: Module, parentAttributes: Attributes): MaterializedValueNode = {
-
- val materializedValues = collection.mutable.HashMap.empty[Module, MaterializedValueNode]
- val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
-
- var materializedValueSources = List.empty[MaterializedValueSource[_]]
-
- for (submodule <- module.subModules) {
- submodule match {
- case mv: MaterializedValueSource[_] =>
- materializedValueSources :+= mv
- case atomic if atomic.isAtomic =>
- materializedValues.put(atomic, materializeAtomic(atomic, currentAttributes))
- case copied: CopiedModule =>
- enterScope(copied)
- materializedValues.put(copied, materializeModule(copied, currentAttributes))
- exitScope(copied)
- case composite =>
- materializedValues.put(composite, materializeComposite(composite, currentAttributes))
- }
- }
-
- val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
-
- materializedValueSources.foreach { module =>
- val matAttribute = new MaterializedValueSourceAttribute(mat)
- val copied = copyAtomicModule(module, parentAttributes and Attributes(matAttribute))
- assignPort(module.shape.outlet, (copied.shape.outlet, copied))
- graph.addVertex(copied)
- materializedValues.put(copied, Atomic(copied))
- }
- mat
- }
-
- private def materializeComposite(composite: Module, effectiveAttributes: Attributes): MaterializedValueNode = {
- materializeModule(composite, effectiveAttributes)
- }
-
- private def mergeAttributes(parent: Attributes, current: Attributes): Attributes = {
- parent and current
- }
-
- private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, MaterializedValueNode]): MaterializedValueNode = matNode match {
- case Atomic(m) => materializedValues(m)
- case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
- case Transform(f, d) => Transform(f, resolveMaterialized(d, materializedValues))
- case Ignore => Ignore
- }
-
- final protected def assignPort(in: InPort, subscriber: (InPort, Module)): Unit = {
- addVertex(subscriber._2)
- subscribers(in) = subscriber
- // Interface (unconnected) ports of the current scope will be wired when exiting the scope
- if (!currentLayout.inPorts(in)) {
- val out = currentLayout.upstreams(in)
- val publisher = publishers(out)
- if (publisher ne null) addEdge(publisher, subscriber)
- }
- }
-
- final protected def assignPort(out: OutPort, publisher: (OutPort, Module)): Unit = {
- addVertex(publisher._2)
- publishers(out) = publisher
- // Interface (unconnected) ports of the current scope will be wired when exiting the scope
- if (!currentLayout.outPorts(out)) {
- val in = currentLayout.downstreams(out)
- val subscriber = subscribers(in)
- if (subscriber ne null) addEdge(publisher, subscriber)
- }
- }
-
- // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
- // of the same module.
- // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
- private def enterScope(enclosing: CopiedModule): Unit = {
- subscribersStack ::= mutable.Map.empty.withDefaultValue(null)
- publishersStack ::= mutable.Map.empty.withDefaultValue(null)
- moduleStack ::= enclosing.copyOf
- }
-
- // Exits the scope of the copied module and propagates Publishers/Subscribers to the enclosing scope assigning
- // them to the copied ports instead of the original ones (since there might be multiple copies of the same module
- // leading to port identity collisions)
- // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
- private def exitScope(enclosing: CopiedModule): Unit = {
- val scopeSubscribers = subscribers
- val scopePublishers = publishers
- subscribersStack = subscribersStack.tail
- publishersStack = publishersStack.tail
- moduleStack = moduleStack.tail
-
- // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of
- // the original module and assign them to the copy ports in the outer scope that we will return to
- enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach {
- case (original, exposed) => assignPort(exposed, scopeSubscribers(original))
- }
-
- enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach {
- case (original, exposed) => assignPort(exposed, scopePublishers(original))
- }
- }
-
- private def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = {
- graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2)
- }
-
- private def addVertex(module: Module): Unit = {
- graph.addVertex(module)
- }
- }
-
- final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
deleted file mode 100644
index a11d7cb..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump
-
-import akka.actor.ActorSystem
-import akka.stream._
-import akka.stream.gearpump.graph.GraphCutter.Strategy
-import akka.stream.gearpump.graph.LocalGraph.LocalGraphMaterializer
-import akka.stream.gearpump.graph.RemoteGraph.RemoteGraphMaterializer
-import akka.stream.gearpump.graph.{GraphCutter, LocalGraph, RemoteGraph, SubGraphMaterializer}
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
- * streaming application. If some module cannot be rendered remotely in Gearpump
- * Cluster, then it uses local Actor materializer as fallback to materialize
- * the module locally.
- *
- * User can custom a [[Strategy]] to determinie which module should be rendered
- * remotely, and which module should be rendered locally.
- *
- * @see [[GraphCutter]] to find out how we cut the [[ModuleGraph]] to two parts,
- * and materialize them seperately.
- *
- * @param system
- * @param strategy
- * @param useLocalCluster whether to use built-in in-process local cluster
- */
-class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = GraphCutter.AllRemoteStrategy,
- useLocalCluster: Boolean = true)
- extends BaseMaterializer {
-
- private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
- classOf[LocalGraph] -> new LocalGraphMaterializer(system),
- classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system)
- )
-
- override def materialize[Mat](graph: ModuleGraph[Mat]): Mat = {
- val subGraphs = new GraphCutter(strategy).cut(graph)
- val matValues = subGraphs.foldLeft(Map.empty[Module, Any]) { (map, subGraph) =>
- val materializer = subMaterializers(subGraph.getClass)
- map ++ materializer.materialize(subGraph, map)
- }
- graph.resolve(matValues)
- }
-
- override def shutdown(): Unit = {
- subMaterializers.values.foreach(_.shutdown())
- }
-}
-
-object GearpumpMaterializer {
- def apply(system: ActorSystem): GearpumpMaterializer = new GearpumpMaterializer(system)
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
deleted file mode 100644
index 7808b52..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.scaladsl.{Sink, Source}
-
-/**
- * This tests how the [[GearpumpMaterializer]] materializes different partials of Graph
- * to different runtime.
- *
- * In this test, source module and sink module are materialized locally,
- * Other transformation module are materialized remotely in Gearpump
- * streaming Application.
- *
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- *
- *
- */
-object Test {
-
- def main(args: Array[String]): Unit = {
-
- println("running Test...")
-
- implicit val system = ActorSystem("akka-test")
- implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy)
-
- val echo = system.actorOf(Props(new Echo()))
- val sink = Sink.actorRef(echo, "COMPLETE")
- val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant",
- "blue sky"))
- source.filter(_.startsWith("red")).fold("Items:") { (a, b) =>
- a + "|" + b
- }.map("I want to order item: " + _).runWith(sink)
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- class Echo extends Actor {
- def receive: Receive = {
- case any: AnyRef =>
- // scalastyle:off println
- println("Confirm received: " + any)
- // scalastyle:on println
- }
- }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
deleted file mode 100644
index 2426f5f..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.ActorMaterializer
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.{GearSink, GearSource}
-import akka.stream.scaladsl.{Flow, Sink, Source}
-
-/**
- *
- * This tests how different Materializers can be used together in an explicit way.
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- *
- */
-object Test2 {
-
- def main(args: Array[String]): Unit = {
-
- println("running Test2...")
- implicit val system = ActorSystem("akka-test")
- val materializer = new GearpumpMaterializer(system)
-
- val echo = system.actorOf(Props(new Echo()))
- val source = GearSource.bridge[String, String]
- val sink = GearSink.bridge[String, String]
-
- val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
- val (entry, exit) = flow.runWith(source, sink)(materializer)
-
- val actorMaterializer = ActorMaterializer()
-
- val externalSource = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
- val externalSink = Sink.actorRef(echo, "COMPLETE")
-
- val graph = FlowGraph.closed() { implicit b =>
- externalSource ~> Sink(entry)
- Source(exit) ~> externalSink
- }
- graph.run()(actorMaterializer)
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- class Echo extends Actor {
- def receive: Receive = {
- case any: AnyRef =>
- println("Confirm received: " + any)
- }
- }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
deleted file mode 100644
index 976b1e6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSource
-import akka.stream.scaladsl.Sink
-
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-
-/**
- * read from remote and write to local
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- */
-object Test3 {
-
- def main(args: Array[String]): Unit = {
-
- println("running Test...")
-
- implicit val system = ActorSystem("akka-test")
- implicit val materializer = new GearpumpMaterializer(system)
-
- val echo = system.actorOf(Props(new Echo()))
- val sink = Sink.actorRef(echo, "COMPLETE")
- val sourceData = new CollectionDataSource(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
- val source = GearSource.from[String](sourceData)
- source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- class Echo extends Actor {
- def receive: Receive = {
- case any: AnyRef =>
- println("Confirm received: " + any)
- }
- }
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
deleted file mode 100644
index 7b80b7b..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSink
-import akka.stream.scaladsl.Source
-
-import org.apache.gearpump.streaming.dsl.LoggerSink
-
-/**
- * read from local and write to remote
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- */
-object Test4 {
-
- def main(args: Array[String]): Unit = {
-
- println("running Test...")
-
- implicit val system = ActorSystem("akka-test")
- implicit val materializer = new GearpumpMaterializer(system)
-
- val sink = GearSink.to(new LoggerSink[String])
- val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
- source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
deleted file mode 100644
index 052c018..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.scaladsl.{Sink, Source, Unzip}
-
-/**
-test fanout
- */
-object Test5 {
-
- def main(args: Array[String]): Unit = {
-
- println("running Test...")
-
- implicit val system = ActorSystem("akka-test")
- implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy)
-
- val echo = system.actorOf(Props(new Echo()))
- val sink = Sink.actorRef(echo, "COMPLETE")
-
- val source = Source(List(("male", "24"), ("female", "23")))
-
- val graph = FlowGraph.closed() { implicit b =>
- val unzip = b.add(Unzip[String, String]())
-
- val sink1 = Sink.actorRef(echo, "COMPLETE")
- val sink2 = Sink.actorRef(echo, "COMPLETE")
-
- source ~> unzip.in
- unzip.out0 ~> sink1
- unzip.out1 ~> sink1
- }
-
- graph.run()
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- class Echo extends Actor {
- def receive: Receive = {
- case any: AnyRef =>
- println("Confirm received: " + any)
- }
- }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
deleted file mode 100644
index 0fccd30..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSource
-import akka.stream.scaladsl.Sink
-
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-
-/**
- * WordCount example
- * Test GroupBy
- */
-
-import akka.stream.gearpump.scaladsl.Implicits._
-
-object Test6 {
-
- def main(args: Array[String]): Unit = {
-
- println("running Test...")
-
- implicit val system = ActorSystem("akka-test")
- implicit val materializer = new GearpumpMaterializer(system)
-
- val echo = system.actorOf(Props(new Echo()))
- val sink = Sink.actorRef(echo, "COMPLETE")
- val sourceData = new CollectionDataSource(List("this is a good start", "this is a good time", "time to start", "congratulations", "green plant", "blue sky"))
- val source = GearSource.from[String](sourceData)
- source.mapConcat { line =>
- line.split(" ").toList
- }.groupBy2(x => x).map(word => (word, 1))
- .reduce({ (a, b) =>
- (a._1, a._2 + b._2)
- }).log("word-count").runWith(sink)
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- class Echo extends Actor {
- def receive: Receive = {
- case any: AnyRef =>
- println("Confirm received: " + any)
- }
- }
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
deleted file mode 100644
index 6ef8598..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.graph
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.materializer.LocalMaterializer
-import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule}
-import akka.stream.impl.Stages.DefaultAttributes
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{PublisherSource, SubscriberSink}
-import akka.stream.{Outlet, SinkShape, SourceShape}
-import org.reactivestreams.{Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- *
- * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- * contain module that can be materialized in local JVM.
- *
- * @param graph
- */
-class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
-
-object LocalGraph {
-
- /**
- * materialize LocalGraph in local JVM
- * @param system
- */
- class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
-
- // Creates a local materializer
- val materializer = LocalMaterializer()(system)
-
- /**
- *
- * @param matValues Materialized Values for each module before materialization
- * @return Materialized Values for each Module after the materialization.
- */
- override def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] = {
- val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { module =>
- module match {
- case source: SourceBridgeModule[AnyRef, AnyRef] =>
- val subscriber = matValues(source).asInstanceOf[Subscriber[AnyRef]]
- val shape = SinkShape(source.inPort)
- new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
- case sink: SinkBridgeModule[AnyRef, AnyRef] =>
- val publisher = matValues(sink).asInstanceOf[Publisher[AnyRef]]
- val shape = SourceShape(sink.outPort.asInstanceOf[Outlet[AnyRef]])
- new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
- case other =>
- other
- }
- }
- materializer.materialize(newGraph, matValues)
- }
-
- override def shutdown(): Unit = {
- materializer.shutdown()
- }
- }
-}
-
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
deleted file mode 100644
index a5c6e48..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import scala.concurrent.{Await, ExecutionContextExecutor}
-
-import akka.actor.{ActorCell, ActorRef, ActorSystem, Deploy, LocalActorRef, PoisonPill, Props, RepointableActorRef}
-import akka.dispatch.Dispatchers
-import akka.pattern.ask
-import akka.stream.ModuleGraph.Edge
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.StreamSupervisor
-import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, MaterializationContext, ModuleGraph}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * [[LocalMaterializer]] will use local actor to materialize the graph
- * Use LocalMaterializer.apply to construct the LocalMaterializer.
- *
- * It is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
- *
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-abstract class LocalMaterializer(
- val system: ActorSystem,
- override val settings: ActorMaterializerSettings,
- dispatchers: Dispatchers,
- val supervisor: ActorRef,
- val haveShutDown: AtomicBoolean,
- flowNameCounter: AtomicLong,
- namePrefix: String,
- optimizations: Optimizations) extends ActorMaterializer {
-
- override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
- import ActorAttributes._
- import Attributes._
- opAttr.attributeList.foldLeft(settings) { (s, attr) =>
- attr match {
- case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
- case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
- case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
- case l: LogLevels => s
- case Name(_) => s
- case other => s
- }
- }
- }
-
- override def shutdown(): Unit =
- if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
-
- override def isShutdown: Boolean = haveShutDown.get()
-
- private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = {
- supervisor match {
- case ref: LocalActorRef =>
- ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false)
- case ref: RepointableActorRef =>
- if (ref.isStarted) {
- ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher),
- name, systemService = false)
- } else {
- implicit val timeout = ref.system.settings.CreationTimeout
- val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher),
- name)).mapTo[ActorRef]
- Await.result(f, timeout.duration)
- }
- case unknown =>
- throw new IllegalStateException(
- s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]")
- }
- }
-
- override lazy val executionContext: ExecutionContextExecutor =
- dispatchers.lookup(settings.dispatcher match {
- case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
- case other => other
- })
-
- def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any]
-
- override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = {
- val graph = ModuleGraph(runnableGraph)
- val matValues = materialize(graph.graph, Map.empty[Module, Any])
- graph.resolve(matValues)
- }
-
- override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
- val dispatcher =
- if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) {
- effectiveSettings(context.effectiveAttributes).dispatcher
- } else {
- props.dispatcher
- }
- actorOf(props, context.stageName, dispatcher)
- }
-}
-
-object LocalMaterializer {
-
- def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
- namePrefix: Option[String] = None,
- optimizations: Optimizations = Optimizations.none)(implicit system: ActorSystem)
- : LocalMaterializerImpl = {
-
- val settings = materializerSettings getOrElse ActorMaterializerSettings(system)
- apply(settings, namePrefix.getOrElse("flow"), optimizations)(system)
- }
-
- def apply(materializerSettings: ActorMaterializerSettings,
- namePrefix: String, optimizations: Optimizations)(implicit system: ActorSystem)
- : LocalMaterializerImpl = {
- val haveShutDown = new AtomicBoolean(false)
-
- new LocalMaterializerImpl(
- system,
- materializerSettings,
- system.dispatchers,
- system.actorOf(StreamSupervisor.props(materializerSettings,
- haveShutDown).withDispatcher(materializerSettings.dispatcher)),
- haveShutDown,
- FlowNameCounter(system).counter,
- namePrefix,
- optimizations)
- }
-}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
deleted file mode 100644
index 1ec724e..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Dispatchers
-import akka.stream.ModuleGraph.{Edge, MaterializedValueSourceAttribute}
-import akka.stream.actor.ActorSubscriber
-import akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule
-import akka.stream.gearpump.module.ReduceModule
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule}
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor}
-import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape}
-import org.reactivestreams.{Processor, Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-class LocalMaterializerImpl (
- system: ActorSystem,
- settings: ActorMaterializerSettings,
- dispatchers: Dispatchers,
- supervisor: ActorRef,
- haveShutDown: AtomicBoolean,
- flowNameCounter: AtomicLong,
- namePrefix: String,
- optimizations: Optimizations)
- extends LocalMaterializer(
- system, settings, dispatchers, supervisor,
- haveShutDown, flowNameCounter, namePrefix, optimizations) {
-
- override def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = {
- val materializedGraph = graph.mapVertex { module =>
- materializeAtomic(module)
- }
-
- materializedGraph.edges.foreach { nodeEdgeNode =>
- val (node1, edge, node2) = nodeEdgeNode
- val from = edge.from
- val to = edge.to
- val publisher = node1.outputs(from).asInstanceOf[Publisher[Any]]
- val subscriber = node2.inputs(to).asInstanceOf[Subscriber[Any]]
- publisher.subscribe(subscriber)
- }
-
- val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex =>
- (vertex.module, vertex.matValue)
- }.toMap
-
- val matValueSources = materializedGraph.vertices.filter(_.module.isInstanceOf[MaterializedValueSource[_]])
- publishToMaterializedValueSource(matValueSources, matValues)
-
- matValues
- }
-
- private def publishToMaterializedValueSource(modules: List[MaterializedModule], matValues: Map[Module, Any]) = {
- modules.foreach { module =>
- val source = module.module.asInstanceOf[MaterializedValueSource[_]]
- val attr = source.attributes.getAttribute(classOf[MaterializedValueSourceAttribute], null)
-
- Option(attr).map { attr =>
- val valueToPublish = MaterializedValueOps(attr.mat).resolve[Any](matValues)
- module.outputs.foreach { portAndPublisher =>
- val (port, publisher) = portAndPublisher
- publisher match {
- case valuePublisher: MaterializedValuePublisher =>
- valuePublisher.setValue(valueToPublish)
- }
- }
- }
- }
- }
-
- private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
-
- private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
-
- val flowName = createFlowName()
- var nextId = 0
-
- def stageName(attr: Attributes): String = {
- val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
- nextId += 1
- name
- }
-
- private def materializeAtomic(atomic: Module): MaterializedModule = {
- val effectiveAttributes = atomic.attributes
-
- def newMaterializationContext() =
- new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))
-
- atomic match {
- case matValue: MaterializedValueSource[_] =>
- val pub = new MaterializedValuePublisher
- val outputs = Map[OutPort, Publisher[_]](matValue.shape.outlet -> pub)
- MaterializedModule(matValue, (), outputs = outputs)
- case sink: SinkModule[_, _] =>
- val (sub, mat) = sink.create(newMaterializationContext())
- val inputs = Map[InPort, Subscriber[_]](sink.shape.inlet -> sub)
- MaterializedModule(sink, mat, inputs)
- case source: SourceModule[_, _] =>
- val (pub, mat) = source.create(newMaterializationContext())
- val outputs = Map[OutPort, Publisher[_]](source.shape.outlet -> pub)
- MaterializedModule(source, mat, outputs = outputs)
-
- case reduce: ReduceModule[Any] =>
- //TODO: remove this after the official akka-stream API support the Reduce Module
- val stage = LocalMaterializerImpl.toFoldModule(reduce)
- val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
- val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
- val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
- MaterializedModule(stage, mat, inputs, outputs)
-
- case stage: StageModule =>
- val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
- val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
- val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
- MaterializedModule(stage, mat, inputs, outputs)
- case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here
- val es = effectiveSettings(effectiveAttributes)
- val props =
- SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing)
- val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
- def factory(id: Int) = new ActorPublisher[Any](impl) {
- override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
- }
- val publishers = Vector.tabulate(2)(factory)
- impl ! FanOut.ExposedPublishers(publishers)
-
- val inputs = Map[InPort, Subscriber[_]](
- tls.plainIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn),
- tls.cipherIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn))
-
- val outputs = Map[OutPort, Publisher[_]](
- tls.plainOut -> publishers(SslTlsCipherActor.UserOut),
- tls.cipherOut -> publishers(SslTlsCipherActor.TransportOut))
- MaterializedModule(tls, (), inputs, outputs)
-
- case junction: JunctionModule =>
- materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes))
- }
- }
-
- private def processorFor(op: StageModule,
- effectiveAttributes: Attributes,
- effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match {
- case DirectProcessor(processorFactory, _) => processorFactory()
- case Identity(attr) => (new VirtualProcessor, ())
- case _ =>
- val (opprops, mat) = ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes)
- ActorProcessorFactory[Any, Any](
- actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat
- }
-
- private def materializeJunction(
- op: JunctionModule,
- effectiveAttributes: Attributes,
- effectiveSettings: ActorMaterializerSettings): MaterializedModule = {
- op match {
- case fanin: FanInModule =>
- val (props, inputs, output) = fanin match {
-
- case MergeModule(shape, _) =>
- (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out)
-
- case f: FlexiMergeModule[_, Shape] =>
- val flexi = f.flexi(f.shape)
- val shape: Shape = f.shape
- (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head)
-
- case MergePreferredModule(shape, _) =>
- (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out)
-
- case ConcatModule(shape, _) =>
- require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO
- (Concat.props(effectiveSettings), shape.inSeq, shape.out)
-
- case zip: ZipWithModule =>
- (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
- }
-
- val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
- val publisher = new ActorPublisher[Any](impl)
- // Resolve cyclic dependency with actor. This MUST be the first message no matter what.
- impl ! ExposedPublisher(publisher)
-
- val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map { pair =>
- val (in, id) = pair
- (in, FanIn.SubInput[Any](impl, id))
- }.toMap
-
- val outMapping = Map(output -> publisher)
- MaterializedModule(fanin, (), inputMapping, outMapping)
-
- case fanout: FanOutModule =>
- val (props, in, outs) = fanout match {
-
- case r: FlexiRouteModule[t, Shape] =>
- val flexi = r.flexi(r.shape)
- val shape: Shape = r.shape
- (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets)
-
- case BroadcastModule(shape, eagerCancel, _) =>
- (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq)
-
- case BalanceModule(shape, waitForDownstreams, _) =>
- (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
-
- case unzip: UnzipWithModule =>
- (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets)
- }
- val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
- val size = outs.size
- def factory(id: Int) =
- new ActorPublisher[Any](impl) {
- override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
- }
- val publishers =
- if (outs.size < 8) Vector.tabulate(size)(factory)
- else List.tabulate(size)(factory)
-
- impl ! FanOut.ExposedPublishers(publishers)
- val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) =>
- (out, pub)
- }.toMap
-
- val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl))
- MaterializedModule(fanout, (), inputs, outputs)
- }
- }
-
- override def withNamePrefix(name: String): Materializer = {
- new LocalMaterializerImpl(system, settings, dispatchers, supervisor,
- haveShutDown, flowNameCounter, namePrefix = name, optimizations)
- }
-}
-
-object LocalMaterializerImpl {
- case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
-
- def toFoldModule(reduce: ReduceModule[Any]): Fold = {
- val f = reduce.f
- val aggregator = { (zero: Any, input: Any) =>
- if (zero == null) {
- input
- } else {
- f(zero, input)
- }
- }
- new Fold(null, aggregator)
- }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
deleted file mode 100644
index 47ed1f2..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
-import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, SinkBridgeTask, SourceBridgeTask, UnZip2Task}
-import akka.stream.impl.Stages
-import akka.stream.impl.Stages.StageModule
-import akka.stream.impl.StreamLayout.Module
-import org.slf4j.LoggerFactory
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp}
-import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-/**
- * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump
- * Streaming Application.
- *
- * @param graph
- * @param system
- */
-class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
-
- import RemoteMaterializerImpl._
-
- type Clue = String
- private implicit val actorSystem = system
-
- private def uuid: String = {
- java.util.UUID.randomUUID.toString
- }
-
- /**
- * @return a mapping from Module to Materialized Processor Id.
- */
- def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
- val (opGraph, clues) = toOpGraph()
- val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph)
- val processorIds = resolveClues(app, clues)
-
- val updatedApp = updateJunctionConfig(processorIds, app)
- (cleanClues(updatedApp), processorIds)
- }
-
- private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], app: StreamApplication): StreamApplication = {
- val config = junctionConfig(processorIds)
-
- val dag = app.dag.mapVertex { vertex =>
- val processorId = vertex.id
- val newConf = vertex.taskConf.withConfig(config(processorId))
- vertex.copy(taskConf = newConf)
- }
- new StreamApplication(app.name, app.inputUserConfig, dag)
- }
-
- /**
- * Update junction config so that each GraphTask know its upstream and downstream.
- * @param processorIds
- * @return
- */
- private def junctionConfig(processorIds: Map[Module, ProcessorId]): Map[ProcessorId, UserConfig] = {
- val updatedConfigs = graph.vertices.map { vertex =>
- val processorId = processorIds(vertex)
- vertex match {
- case junction: JunctionModule =>
- val inProcessors = junction.shape.inlets.map { inlet =>
- val upstreamModule = graph.incomingEdgesOf(junction).find(_._2.to == inlet).map(_._1)
- val upstreamProcessorId = processorIds(upstreamModule.get)
- upstreamProcessorId
- }.toList
-
- val outProcessors = junction.shape.outlets.map { outlet =>
- val downstreamModule = graph.outgoingEdgesOf(junction).find(_._2.from == outlet).map(_._3)
- val downstreamProcessorId = downstreamModule.map(processorIds(_))
- downstreamProcessorId.get
- }.toList
-
- (processorId, UserConfig.empty.withValue(GraphTask.OUT_PROCESSORS, outProcessors)
- .withValue(GraphTask.IN_PROCESSORS, inProcessors))
- case _ =>
- (processorId, UserConfig.empty)
- }
- }.toMap
- updatedConfigs
- }
-
- private def resolveClues(app: StreamApplication, clues: Map[Module, Clue]): Map[Module, ProcessorId] = {
- clues.flatMap { kv =>
- val (module, clue) = kv
- val processorId = app.dag.vertices.find { processor =>
- processor.taskConf.getString(clue).isDefined
- }.map(_.id)
- processorId.map((module, _))
- }
- }
-
- private def cleanClues(app: StreamApplication): StreamApplication = {
- val graph = app.dag.mapVertex { processor =>
- val conf = cleanClue(processor.taskConf)
- processor.copy(taskConf = conf)
- }
- new StreamApplication(app.name, app.inputUserConfig, graph)
- }
-
- private def cleanClue(conf: UserConfig): UserConfig = {
- conf.filter { kv =>
- kv._2 != RemoteMaterializerImpl.STAINS
- }
- }
-
- private def toOpGraph(): (Graph[Op, OpEdge], Map[Module, Clue]) = {
- var matValues = Map.empty[Module, Clue]
- val opGraph = graph.mapVertex{ module =>
- val name = uuid
- val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.STAINS)
- matValues += module -> name
- val parallelism = GearAttributes.count(module.attributes)
- val op = module match {
- case source: SourceTaskModule[t] =>
- val updatedConf = conf.withConfig(source.conf)
- new DataSourceOp[t](source.source, parallelism, updatedConf, "source")
- case sink: SinkTaskModule[t] =>
- val updatedConf = conf.withConfig(sink.conf)
- new DataSinkOp[t](sink.sink, parallelism, updatedConf, "sink")
- case sourceBridge: SourceBridgeModule[_, _] =>
- new ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
- case processor: ProcessorModule[_, _, _] =>
- val updatedConf = conf.withConfig(processor.conf)
- new ProcessorOp(processor.processor, parallelism, updatedConf, "source")
- case sinkBridge: SinkBridgeModule[_, _] =>
- new ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
- case groupBy: GroupByModule[t, g] =>
- new GroupByOp[t, g](groupBy.groupBy, parallelism, "groupBy", conf)
- case reduce: ReduceModule[Any] =>
- reduceOp(reduce.f, conf)
- case stage: StageModule =>
- translateStage(stage, conf)
- case fanIn: FanInModule =>
- translateFanIn(fanIn, graph.incomingEdgesOf(fanIn), parallelism, conf)
- case fanOut: FanOutModule =>
- translateFanOut(fanOut, graph.outgoingEdgesOf(fanOut), parallelism, conf)
- }
-
- if (op == null) {
- throw new UnsupportedOperationException(module.getClass.toString + " is not supported with RemoteMaterializer")
- }
- op
- }.mapEdge[OpEdge]{(n1, edge, n2) =>
- n2 match {
- case master: MasterOp =>
- Shuffle
- case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
- Shuffle
- case slave: SlaveOp[_] =>
- Direct
- }
- }
- (opGraph, matValues)
- }
-
- private def translateStage(module: StageModule, conf: UserConfig): Op = {
- module match {
- case buffer: Stages.Buffer =>
- //ignore the buffering operation
- identity("buffer", conf)
- case collect: Stages.Collect =>
- collectOp(collect.pf, conf)
- case concatAll: Stages.ConcatAll =>
- //TODO:
- null
- case conflat: Stages.Conflate =>
- conflatOp(conflat.seed, conflat.aggregate, conf)
- case drop: Stages.Drop =>
- dropOp(drop.n, conf)
- case dropWhile: Stages.DropWhile =>
- dropWhileOp(dropWhile.p, conf)
- case expand: Stages.Expand =>
- //TODO
- null
- case filter: Stages.Filter =>
- filterOp(filter.p, conf)
- case fold: Stages.Fold =>
- foldOp(fold.zero, fold.f, conf)
- case groupBy: Stages.GroupBy =>
- //TODO
- null
- case grouped: Stages.Grouped =>
- groupedOp(grouped.n, conf)
- case _: Stages.Identity =>
- identity("identity", conf)
- case log: Stages.Log =>
- logOp(log.name, log.extract, conf)
- case map: Stages.Map =>
- mapOp(map.f, conf)
- case mapAsync: Stages.MapAsync =>
- //TODO
- null
- case mapAsync: Stages.MapAsyncUnordered =>
- //TODO
- null
- case flatMap: Stages.MapConcat =>
- flatMapOp(flatMap.f, "mapConcat", conf)
- case stage: MaterializingStageFactory =>
- //TODO
- null
- case prefixAndTail: Stages.PrefixAndTail =>
- //TODO
- null
- case recover: Stages.Recover =>
- //TODO: we will just ignore this
- identity("recover", conf)
- case scan: Stages.Scan =>
- scanOp(scan.zero, scan.f, conf)
- case split: Stages.Split =>
- //TODO
- null
- case stage: Stages.StageFactory =>
- //TODO
- null
- case take: Stages.Take =>
- takeOp(take.n, conf)
- case takeWhile: Stages.TakeWhile =>
- filterOp(takeWhile.p, conf)
- case time: Stages.TimerTransform =>
- //TODO
- null
- }
- }
-
- private def translateFanIn(
- fanIn: FanInModule,
- edges: List[(Module, Edge, Module)],
- parallelism: Int,
- conf: UserConfig): Op = {
- fanIn match {
- case merge: MergeModule[_] =>
- MergeOp("merge", conf)
- case mergePrefered: MergePreferredModule[_] =>
- //TODO, support "prefer" merge
- MergeOp("mergePrefered", conf)
- case zip: ZipWithModule =>
- //TODO: support zip module
- null
- case concat: ConcatModule[_] =>
- //TODO: support concat module
- null
- case flexiMerge: FlexiMergeModule[_, _] =>
- //TODO: Suport flexi merge module
- null
- }
- }
-
- private def translateFanOut(
- fanOut: FanOutModule,
- edges: List[(Module, Edge, Module)],
- parallelism: Int,
- conf: UserConfig): Op = {
- fanOut match {
- case unzip2: UnzipWith2Module[Any, Any, Any] =>
- val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new UnZip2Task.UnZipFunction(unzip2.f))
- new ProcessorOp(classOf[UnZip2Task], parallelism, updatedConf, "unzip")
- case broadcast: BroadcastModule[_] =>
- new ProcessorOp(classOf[BroadcastTask], parallelism, conf, "broadcast")
- case broadcast: BalanceModule[_] =>
- new ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
- case flexi: FlexiRouteImpl[_, _] =>
- //TODO
- null
- }
- }
-}
-
-object RemoteMaterializerImpl {
- final val NotApplied: Any => Any = _ => NotApplied
-
- def collectOp(collect: PartialFunction[Any, Any], conf: UserConfig): Op = {
- flatMapOp({ data =>
- collect.applyOrElse(data, NotApplied) match {
- case NotApplied => None
- case result: Any => Option(result)
- }
- }, "collect", conf)
- }
-
- def filterOp(filter: Any => Boolean, conf: UserConfig): Op = {
- flatMapOp({ data =>
- if (filter(data)) Option(data) else None
- }, "filter", conf)
- }
-
- def reduceOp(reduce: (Any, Any) => Any, conf: UserConfig): Op = {
- var result: Any = null
- val flatMap = { elem: Any =>
- if (result == null) {
- result = elem
- } else {
- result = reduce(result, elem)
- }
- List(result)
- }
- flatMapOp(flatMap, "reduce", conf)
- }
-
- def identity(description: String, conf: UserConfig): Op = {
- flatMapOp({ data =>
- List(data)
- }, description, conf)
- }
-
- def mapOp(map: Any => Any, conf: UserConfig): Op = {
- flatMapOp({ data: Any =>
- List(map(data))
- }, "map", conf)
- }
-
- def flatMapOp(flatMap: Any => Iterable[Any], conf: UserConfig): Op = {
- flatMapOp(flatMap, "flatmap", conf)
- }
-
- def flatMapOp(fun: Any => TraversableOnce[Any], description: String, conf: UserConfig): Op = {
- FlatMapOp(fun, description, conf)
- }
-
- def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: UserConfig): Op = {
- var agg: Any = null
- val flatMap = { elem: Any =>
- agg = if (agg == null) {
- seed(elem)
- } else {
- aggregate(agg, elem)
- }
- List(agg)
- }
-
- flatMapOp(flatMap, "map", conf)
- }
-
- def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = {
- var aggregator: Any = zero
- val map = { elem: Any =>
- aggregator = fold(aggregator, elem)
- List(aggregator)
- }
- flatMapOp(map, "fold", conf)
- }
-
- def groupedOp(count: Int, conf: UserConfig): Op = {
- var left = count
- val buf = {
- val b = Vector.newBuilder[Any]
- b.sizeHint(count)
- b
- }
-
- val flatMap: Any => Iterable[Any] = { input: Any =>
- buf += input
- left -= 1
- if (left == 0) {
- val emit = buf.result()
- buf.clear()
- left = count
- Some(emit)
- } else {
- None
- }
- }
- flatMapOp(flatMap, conf: UserConfig)
- }
-
- def dropOp(number: Long, conf: UserConfig): Op = {
- var left = number
- val flatMap: Any => Iterable[Any] = { input: Any =>
- if (left > 0) {
- left -= 1
- None
- } else {
- Some(input)
- }
- }
- flatMapOp(flatMap, "drop", conf)
- }
-
- def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = {
- flatMapOp({ data =>
- if (drop(data)) None else Option(data)
- }, "dropWhile", conf)
- }
-
- def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = {
- val flatMap = { elem: Any =>
- LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
- List(elem)
- }
- flatMapOp(flatMap, "log", conf)
- }
-
- def scanOp(zero: Any, f: (Any, Any) => Any, conf: UserConfig): Op = {
- var aggregator = zero
- var pushedZero = false
-
- val flatMap = { elem: Any =>
- aggregator = f(aggregator, elem)
-
- if (pushedZero) {
- pushedZero = true
- List(zero, aggregator)
- } else {
- List(aggregator)
- }
- }
- flatMapOp(flatMap, "scan", conf)
- }
-
- def takeOp(count: Long, conf: UserConfig): Op = {
- var left: Long = count
-
- val filter: Any => Iterable[Any] = { elem: Any =>
- left -= 1
- if (left > 0) Some(elem)
- else if (left == 0) Some(elem)
- else None
- }
- flatMapOp(filter, "take", conf)
- }
-
- /**
- * We use stains to track how module maps to Processor
- *
- */
- val STAINS = "track how module is fused to processor"
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
deleted file mode 100644
index c5dfc9a..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- *
- *
- * [[IN]] -> [[BridgeModule]] -> [[OUT]]
- * /
- * /
- * out of band data input or output channel [[MAT]]
- *
- *
- * [[BridgeModule]] is used as a bridge between different materializers.
- * Different [[akka.stream.Materializer]]s can use out of band channel to
- * exchange messages.
- *
- * For example:
- *
- * Remote Materializer
- * -----------------------------
- * | |
- * | BridgeModule -> RemoteSink |
- * | / |
- * --/----------------------------
- * Local Materializer / out of band channel.
- * ----------------------/----
- * | Local / |
- * | Source -> BridgeModule |
- * | |
- * ---------------------------
- *
- *
- * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
- * module during materialization.
- *
- * However, user can still declare it explicitly. In this case, it means we have a
- * boundary Source or Sink module which accept out of band channel inputs or
- * outputs.
- *
- *
- * @tparam IN
- * @tparam OUT
- * @tparam MAT
- */
-abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] {
- def attributes: Attributes
- def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
-
- protected def newInstance: BridgeModule[IN, OUT, MAT]
- override def carbonCopy: Module = newInstance
-}
-
-/**
- *
- * Bridge module which accept out of band channel Input
- * [[org.reactivestreams.Publisher]][IN].
- *
- *
- * [[SourceBridgeModule]] -> [[OUT]]
- * /|
- * /
- * out of band data input [[org.reactivestreams.Publisher]][IN]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]]
- * @tparam OUT out put data type to next module.
- */
-class SourceBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] {
- override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = new SourceBridgeModule[IN, OUT](attributes)
-
- override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = {
- new SourceBridgeModule(attributes)
- }
-}
-
-/**
- *
- * Bridge module which accept out of band channel Output
- * [[org.reactivestreams.Subscriber]][OUT].
- *
- *
- * [[IN]] -> [[BridgeModule]]
- * \
- * \
- * \|
- * out of band data output [[org.reactivestreams.Subscriber]][OUT]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from previous module
- * @tparam OUT out put data type to out of band subscriber
- */
-class SinkBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] {
- override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = new SinkBridgeModule[IN, OUT](attributes)
-
- override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = {
- new SinkBridgeModule[IN, OUT](attributes)
- }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
deleted file mode 100644
index e57a6f6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Group the T value groupBy function
- *
- * @param f
- * @param attributes
- * @tparam T
- * @tparam Group
- */
-case class GroupByModule[T, Group](val groupBy: T => Group,
- val attributes: Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit] {
-
- override def carbonCopy: Module = newInstance
-
- protected def newInstance: GroupByModule[T, Group] = {
- new GroupByModule[T, Group](groupBy, attributes)
- }
-
- override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = {
- new GroupByModule[T, Group](groupBy, attributes)
- }
-}
\ No newline at end of file
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
similarity index 93%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
index 50c4450..4384b39 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-package akka.stream.gearpump
+package org.apache.gearpump.akkastream
import akka.stream.Attributes
import akka.stream.Attributes.Attribute
object GearAttributes {
-
/**
* Define how many parallel instance we want to use to run this module
- * @param count
+ * @param count Int
* @return
*/
def count(count: Int): Attributes = Attributes(ParallismAttribute(count))
@@ -46,7 +45,7 @@
* Get the effective location settings if child override the parent
* setttings.
*
- * @param attrs
+ * @param attrs Attributes
* @return
*/
def location(attrs: Attributes): Location = {
@@ -60,7 +59,7 @@
/**
* get effective parallelism settings if child override parent.
- * @param attrs
+ * @param attrs Attributes
* @return
*/
def count(attrs: Attributes): Int = {
@@ -84,7 +83,7 @@
/**
* How many parallel instance we want to use for this module.
*
- * @param parallelism
+ * @param parallelism Int
*/
final case class ParallismAttribute(parallelism: Int) extends Attribute
}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
new file mode 100644
index 0000000..07c95f8
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import akka.NotUsed
+import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem}
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.Attributes.Attribute
+import akka.stream.impl.Stages.SymbolicGraphStage
+import akka.stream.impl.StreamLayout.{Atomic, Combine, CopiedModule, Ignore, MaterializedValueNode, Module, Transform}
+import akka.stream.{ActorAttributes, ActorMaterializerSettings, Attributes, ClosedShape, Fusing, Graph, InPort, OutPort, SinkShape}
+import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
+import akka.stream.impl.{ExtendedActorMaterializer, StreamSupervisor}
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
+import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
+import org.apache.gearpump.akkastream.graph._
+import org.apache.gearpump.util.{Graph => GGraph}
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContextExecutor, Promise}
+import scala.concurrent.duration.FiniteDuration
+
+object GearpumpMaterializer {
+
+ final case class Edge(from: OutPort, to: InPort)
+
+ final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
+
+ implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool)
+
+ def apply(strategy: Strategy)(implicit context: ActorRefFactory): ExtendedActorMaterializer = {
+ val system = actorSystemOf(context)
+
+ apply(ActorMaterializerSettings(
+ system).withAutoFusing(false), strategy, useLocalCluster = false, "flow")(context)
+ }
+
+ def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+ strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+ useLocalCluster: Boolean = true,
+ namePrefix: Option[String] = None)(implicit context: ActorRefFactory):
+ ExtendedActorMaterializer = {
+ val system = actorSystemOf(context)
+
+ val settings = materializerSettings getOrElse
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ apply(settings, strategy, useLocalCluster, namePrefix.getOrElse("flow"))(context)
+ }
+
+ def apply(materializerSettings: ActorMaterializerSettings,
+ strategy: Strategy,
+ useLocalCluster: Boolean,
+ namePrefix: String)(implicit context: ActorRefFactory):
+ ExtendedActorMaterializer = {
+ val system = actorSystemOf(context)
+
+ new GearpumpMaterializer(
+ system,
+ materializerSettings,
+ context.actorOf(
+ StreamSupervisor.props(materializerSettings, false).withDispatcher(
+ materializerSettings.dispatcher), StreamSupervisor.nextName()))
+ }
+
+
+ private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
+ val system = context match {
+ case s: ExtendedActorSystem => s
+ case c: ActorContext => c.system
+ case null => throw new IllegalArgumentException("ActorRefFactory context must be defined")
+ case _ =>
+ throw new IllegalArgumentException(
+ s"""
+ | context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
+ """.stripMargin
+ )
+ }
+ system
+ }
+
+}
+
+/**
+ *
+ * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
+ * streaming application. If some module cannot be rendered remotely in Gearpump
+ * Cluster, then it will use local Actor materializer as fallback to materialize
+ * the module locally.
+ *
+ * User can customize a [[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]]
+ * to determine which module should be rendered
+ * remotely, and which module should be rendered locally.
+ *
+ * @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]]
+ * to find out how we cut the runnableGraph to two parts,
+ * and materialize them separately.
+ * @param system ActorSystem
+ * @param strategy Strategy
+ * @param useLocalCluster whether to use built-in in-process local cluster
+ */
+class GearpumpMaterializer(override val system: ActorSystem,
+ override val settings: ActorMaterializerSettings,
+ override val supervisor: ActorRef,
+ strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
+ useLocalCluster: Boolean = true, namePrefix: Option[String] = None)
+ extends ExtendedActorMaterializer {
+
+ private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
+ classOf[LocalGraph] -> new LocalGraphMaterializer(system),
+ classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system)
+ )
+
+ override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+ override def isShutdown: Boolean = system.whenTerminated.isCompleted
+
+ override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+ import ActorAttributes._
+ import Attributes._
+ opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+ attr match {
+ case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+ case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+ case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+ case _ => s
+ }
+ }
+ }
+
+ override def withNamePrefix(name: String): ExtendedActorMaterializer =
+ throw new UnsupportedOperationException()
+
+ override implicit def executionContext: ExecutionContextExecutor =
+ throw new UnsupportedOperationException()
+
+ override def schedulePeriodically(initialDelay: FiniteDuration,
+ interval: FiniteDuration,
+ task: Runnable): Cancellable =
+ system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+ override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+ system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+ override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
+ val info = Fusing.aggressive(runnableGraph).module.info
+ val graph = GGraph.empty[Module, Edge]
+
+ info.subModules.foreach(module => {
+ if (module.isCopied) {
+ val original = module.asInstanceOf[CopiedModule].copyOf
+ graph.addVertex(original)
+ module.shape.outlets.zip(original.shape.outlets).foreach(out => {
+ val (cout, oout) = out
+ val cin = info.downstreams(cout)
+ val downStreamModule = info.inOwners(cin)
+ if(downStreamModule.isCopied) {
+ val downStreamOriginal = downStreamModule.asInstanceOf[CopiedModule].copyOf
+ downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in => {
+ in._1 == cin match {
+ case true =>
+ val oin = in._2
+ graph.addEdge(original, Edge(oout, oin), downStreamOriginal)
+ case false =>
+ }
+ })
+ }
+ })
+ }
+ })
+
+ printGraph(graph)
+
+ val subGraphs = GraphPartitioner(strategy).partition(graph)
+ val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) =>
+ val materializer = subMaterializers(subGraph.getClass)
+ map ++ materializer.materialize(subGraph, map)
+ }
+ val mat = matValues.flatMap(pair => {
+ val (module, any) = pair
+ any match {
+ case notUsed: NotUsed =>
+ None
+ case others =>
+ val rt = module.shape match {
+ case sink: SinkShape[_] =>
+ Some(any)
+ case _ =>
+ None
+ }
+ rt
+ }
+ }).toList
+ val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last
+ resolveMaterialized(matModule.materializedValueComputation, matValues)
+ val rt = Some(mat).flatMap(any => {
+ any match {
+ case promise: Promise[_] =>
+ Some(promise.future)
+ case other =>
+ Some(other)
+ }
+ })
+ rt.orNull.asInstanceOf[Mat]
+ }
+
+ private def printGraph(graph: GGraph[Module, Edge]): Unit = {
+ val iterator = graph.topologicalOrderIterator
+ while (iterator.hasNext) {
+ val module = iterator.next()
+ // scalastyle:off println
+ module match {
+ case graphStageModule: GraphStageModule =>
+ graphStageModule.stage match {
+ case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
+ val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
+ println(
+ s"${module.getClass.getSimpleName}(${symbolicName})"
+ )
+ case graphStage: GraphStage[_] =>
+ val name = graphStage.getClass.getSimpleName
+ println(
+ s"${module.getClass.getSimpleName}(${name})"
+ )
+ case other =>
+ println(
+ s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
+ )
+ }
+ case _ =>
+ println(module.getClass.getSimpleName)
+ }
+ // scalastyle:on println
+ }
+ }
+
+ override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+ initialAttributes: Attributes): Mat = {
+ materialize(runnableGraph)
+ }
+
+ override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+ subflowFuser: (GraphInterpreterShell) => ActorRef): Mat = {
+ materialize(runnableGraph)
+ }
+
+ override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
+ subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
+ materialize(runnableGraph)
+ }
+
+ override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+ logger
+ }
+
+ def shutdown: Unit = {
+ subMaterializers.values.foreach(_.shutdown)
+ }
+
+ private def resolveMaterialized(mat: MaterializedValueNode,
+ materializedValues: mutable.Map[Module, Any]): Any = mat match {
+ case Atomic(m) =>
+ materializedValues.getOrElse(m, ())
+ case Combine(f, d1, d2) =>
+ f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
+ case Transform(f, d) =>
+ f(resolveMaterialized(d, materializedValues))
+ case Ignore =>
+ ()
+ }
+
+
+
+}
+
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
new file mode 100644
index 0000000..8a869d2
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import java.{util => ju}
+
+import _root_.org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.ActorSystem
+import akka.stream._
+import org.apache.gearpump.akkastream.GearpumpMaterializer.{Edge, MaterializedValueSourceAttribute}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+
+class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module,
+ initialAttributes: Attributes, namePrefix: Option[String] = None)
+ extends MaterializerSession(topLevel, initialAttributes) {
+
+ private[this] def createFlowName(): String =
+ FlowNames(system).name.copy(namePrefix.getOrElse("flow")).next()
+
+ private val flowName = createFlowName()
+ private var nextId = 0
+
+ private def stageName(attr: Attributes): String = {
+ val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+ nextId += 1
+ name
+ }
+
+ val graph = GGraph.empty[Module, Edge]
+
+ def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = {
+ graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2)
+ }
+
+ def addVertex(module: Module): Unit = {
+ graph.addVertex(module)
+ }
+
+ override def materializeModule(module: Module, parentAttributes: Attributes): Any = {
+
+ val materializedValues: ju.Map[Module, Any] = new ju.HashMap
+ val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+
+ val materializedValueSources = List.empty[MaterializedValueSource[_]]
+
+ for (submodule <- module.subModules) {
+ submodule match {
+ case atomic: AtomicModule =>
+ materializeAtomic(atomic, currentAttributes, materializedValues)
+ case copied: CopiedModule =>
+ enterScope(copied)
+ materializedValues.put(copied, materializeModule(copied, currentAttributes))
+ exitScope(copied)
+ case composite =>
+ materializedValues.put(composite, materializeComposite(composite, currentAttributes))
+ case EmptyModule =>
+ }
+ }
+
+ val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
+
+ materializedValueSources.foreach { module =>
+ val matAttribute =
+ new MaterializedValueSourceAttribute(mat.asInstanceOf[MaterializedValueNode])
+ val copied = copyAtomicModule(module.module, parentAttributes
+ and Attributes(matAttribute))
+ // TODO
+ // assignPort(module.shape.out, (copied.shape.outlets.head, copied))
+ addVertex(copied)
+ materializedValues.put(copied, Atomic(copied))
+ }
+ mat
+
+ }
+
+ override protected def materializeComposite(composite: Module,
+ effectiveAttributes: Attributes): Any = {
+ materializeModule(composite, effectiveAttributes)
+ }
+
+ protected def materializeAtomic(atomic: AtomicModule,
+ parentAttributes: Attributes,
+ matVal: ju.Map[Module, Any]): Unit = {
+
+ val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
+ val copied = copyAtomicModule(atomic, parentAttributes)
+
+ for ((in, id) <- inputs.zipWithIndex) {
+ val inPort = inPortMapping(atomic, copied)(in)
+ // assignPort(in, (inPort, copied))
+ }
+
+ for ((out, id) <- outputs.zipWithIndex) {
+ val outPort = outPortMapping(atomic, copied)(out)
+ // TODO
+ // assignPort(out, (outPort, copied))
+ }
+
+ addVertex(copied)
+ matVal.put(atomic, Atomic(copied))
+ }
+
+ private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = {
+ val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
+ module.withAttributes(currentAttributes).asInstanceOf[T]
+ }
+
+ private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = {
+ from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
+ }
+
+ private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
+ from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
+ }
+
+ protected def resolveMaterialized(matNode: MaterializedValueNode,
+ materializedValues: ju.Map[Module, Any]):
+ Any =
+ matNode match {
+ case Atomic(m) => materializedValues.get(m)
+ case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+ resolveMaterialized(d2, materializedValues))
+ case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+ case Ignore => Ignore
+ }
+}
+
+object GearpumpMaterializerSession {
+ def apply(system: ActorSystem, topLevel: Module,
+ initialAttributes: Attributes, namePrefix: Option[String] = None):
+ GearpumpMaterializerSession = {
+ new GearpumpMaterializerSession(system, topLevel, initialAttributes, namePrefix)
+ }
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
new file mode 100644
index 0000000..52a45d9
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.{Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * Source and Sink are materialized locally.
+ * Remaining GraphStages are materialized remotely:
+ * statefulMap, filter, fold, flatMap
+ */
+object Test extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test", akkaConf)
+ implicit val materializer = GearpumpMaterializer(GraphPartitioner.AllRemoteStrategy)
+
+ val echo = system.actorOf(Props(new Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+
+ Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ ).filter(_.startsWith("red")).fold("Items:") {(a, b) =>
+ a + "|" + b
+ }.map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
new file mode 100644
index 0000000..826cdcf
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.{ClosedShape, ThrottleMode}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Conflate, Throttle
+ */
+object Test10 extends AkkaApp with ArgumentsParser {
+
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ implicit val system = ActorSystem("Test10", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ // Conflate[A] - (2 inputs, 1 output) concatenates two streams
+ // (first consumes one, then the second one)
+ def stream(x: String) = Stream.continually(x)
+
+ val sourceA = Source(stream("A"))
+ val sourceB = Source(stream("B"))
+
+ val throttler: Flow[String, String, NotUsed] =
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping)
+ val conflateFlow: Flow[String, String, NotUsed] =
+ Flow[String].conflate((x: String, y: String) => x: String)
+ ((acc: String, x: String) => s"$acc::$x")
+
+ val printFlow: Flow[(String, String), String, NotUsed] =
+ Flow[(String, String)].map {
+ x =>
+ println(s" lengths are : ${x._1.length} and ${x._2.length} ; ${x._1} zip ${x._2}")
+ x.toString
+ }
+
+ val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val zipping = b.add(Zip[String, String]())
+
+ sourceA ~> throttler ~> zipping.in0
+ sourceB ~> conflateFlow ~> zipping.in1
+
+ zipping.out ~> printFlow ~> Sink.ignore
+
+ ClosedShape
+ })
+
+ graph.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
new file mode 100644
index 0000000..087c57d
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.stream.ClosedShape
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Broadcast and Merge
+ */
+object Test11 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ implicit val system = ActorSystem("Test11", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+ // implicit val materializer =
+ // ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
+ implicit val ec = system.dispatcher
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create() {
+ implicit builder: GraphDSL.Builder[NotUsed] =>
+
+ import GraphDSL.Implicits._
+ val in = Source(1 to 10)
+ val output: (Any) => Unit = any => {
+ val s = s"**** $any"
+ println(s)
+ }
+ val out = Sink.foreach(output)
+
+ val broadcast = builder.add(Broadcast[Int](2))
+ val merge = builder.add(Merge[Int](2))
+
+ val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
+
+ in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out
+ broadcast ~> f4 ~> merge
+
+ ClosedShape
+ })
+
+ g.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
new file mode 100644
index 0000000..b4f4bce
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.stream.{ClosedShape, UniformFanInShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+
+/**
+ * Partial source, sink example
+ */
+object Test12 extends AkkaApp with ArgumentsParser{
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ import akka.actor.ActorSystem
+ import akka.stream.scaladsl._
+
+ import scala.concurrent.duration._
+
+ implicit val system = ActorSystem("Test12", akkaConfig)
+ // implicit val materializer = ActorMaterializer(
+ // ActorMaterializerSettings(system).withAutoFusing(false)
+ // )
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ val pickMaxOfThree = GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+
+ val zip1 = b.add(ZipWith[Int, Int, Int](math.max))
+ val zip2 = b.add(ZipWith[Int, Int, Int](math.max))
+
+ zip1.out ~> zip2.in0
+
+ UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
+ }
+
+ val resultSink = Sink.head[Int]
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
+ sink =>
+ import GraphDSL.Implicits._
+
+ // Importing the partial shape will return its shape (inlets & outlets)
+ val pm3 = b.add(pickMaxOfThree)
+
+ Source.single(1) ~> pm3.in(0)
+ Source.single(2) ~> pm3.in(1)
+ Source.single(3) ~> pm3.in(2)
+
+ pm3.out ~> sink.in
+
+ ClosedShape
+ })
+
+ val max: Future[Int] = g.run()
+ max.map(x => println(s"maximum of three numbers : $x"))
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
new file mode 100644
index 0000000..2e036cb
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.time._
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.collection.mutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.Random
+
+/**
+ * GroupBy example
+ */
+
+/*
+// Original example
+val f = Source
+ .tick(0.seconds, 1.second, "")
+ .map { _ =>
+ val now = System.currentTimeMillis()
+ val delay = random.nextInt(8)
+ MyEvent(now - delay * 1000L)
+ }
+ .statefulMapConcat { () =>
+ val generator = new CommandGenerator()
+ ev => generator.forEvent(ev)
+ }
+ .groupBy(64, command => command.w)
+ .takeWhile(!_.isInstanceOf[CloseWindow])
+ .fold(AggregateEventData((0L, 0L), 0))({
+ case (agg, OpenWindow(window)) => agg.copy(w = window)
+ // always filtered out by takeWhile
+ case (agg, CloseWindow(_)) => agg
+ case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+ })
+ .async
+ .mergeSubstreams
+ .runForeach { agg =>
+ println(agg.toString)
+ }
+ */
+object Test13 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ implicit val system = ActorSystem("Test13", akkaConfig)
+ implicit val materializer = GearpumpMaterializer()
+
+ val random = new Random()
+
+ val result = Source
+ .tick(0.seconds, 1.second, "tick data")
+ .map { _ =>
+ val now = System.currentTimeMillis()
+ val delay = random.nextInt(8)
+ MyEvent(now - delay * 1000L)
+ }
+ .statefulMapConcat { () =>
+ val generator = new CommandGenerator()
+ ev => generator.forEvent(ev)
+ }
+ .groupBy2(command => command.w)
+ .takeWhile(!_.isInstanceOf[CloseWindow])
+ .fold(AggregateEventData((0L, 0L), 0))({
+ case (agg, OpenWindow(window)) => agg.copy(w = window)
+ // always filtered out by takeWhile
+ case (agg, CloseWindow(_)) => agg
+ case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1)
+ })
+ .runForeach(agg =>
+ println(agg.toString)
+ )
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ case class MyEvent(timestamp: Long)
+
+ type Window = (Long, Long)
+
+ object Window {
+ val WindowLength = 10.seconds.toMillis
+ val WindowStep = 1.second.toMillis
+ val WindowsPerEvent = (WindowLength / WindowStep).toInt
+
+ def windowsFor(ts: Long): Set[Window] = {
+ val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep
+ (for (i <- 0 until WindowsPerEvent) yield
+ (firstWindowStart + i * WindowStep,
+ firstWindowStart + i * WindowStep + WindowLength)
+ ).toSet
+ }
+ }
+
+ sealed trait WindowCommand {
+ def w: Window
+ }
+
+ case class OpenWindow(w: Window) extends WindowCommand
+
+ case class CloseWindow(w: Window) extends WindowCommand
+
+ case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand
+
+ class CommandGenerator {
+ private val MaxDelay = 5.seconds.toMillis
+ private var watermark = 0L
+ private val openWindows = mutable.Set[Window]()
+
+ def forEvent(ev: MyEvent): List[WindowCommand] = {
+ watermark = math.max(watermark, ev.timestamp - MaxDelay)
+ if (ev.timestamp < watermark) {
+ println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}")
+ Nil
+ } else {
+ val eventWindows = Window.windowsFor(ev.timestamp)
+
+ val closeCommands = openWindows.flatMap { ow =>
+ if (!eventWindows.contains(ow) && ow._2 < watermark) {
+ openWindows.remove(ow)
+ Some(CloseWindow(ow))
+ } else None
+ }
+
+ val openCommands = eventWindows.flatMap { w =>
+ if (!openWindows.contains(w)) {
+ openWindows.add(w)
+ Some(OpenWindow(w))
+ } else None
+ }
+
+ val addCommands = eventWindows.map(w => AddToWindow(ev, w))
+
+ openCommands.toList ++ closeCommands.toList ++ addCommands.toList
+ }
+ }
+ }
+
+ case class AggregateEventData(w: Window, eventCount: Int) {
+ override def toString: String =
+ s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events."
+ }
+
+ def tsToString(ts: Long): String = OffsetDateTime
+ .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
+ .toLocalTime
+ .toString
+ // scalastyle:on println
+
+}
+
+
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
new file mode 100644
index 0000000..c436130
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import java.io.File
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+object Test14 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test14", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ def lineSink(filename: String): Sink[String, Future[IOResult]] = {
+ Flow[String]
+ .alsoTo(Sink.foreach(s => println(s"$filename: $s")))
+ .map(s => ByteString(s + "\n"))
+ .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right)
+ }
+
+ val source: Source[Int, NotUsed] = Source(1 to 100)
+ val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next)
+ val sink1 = lineSink("factorial1.txt")
+ val sink2 = lineSink("factorial2.txt")
+ val slowSink2 = Flow[String].via(
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+ ).toMat(sink2)(Keep.right)
+ val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via(
+ Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping)
+ ).toMat(sink2)(Keep.right)
+
+ val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val bcast = b.add(Broadcast[String](2))
+ factorials.map(_.toString) ~> bcast.in
+ bcast.out(0) ~> sink1
+ bcast.out(1) ~> bufferedSink2
+ ClosedShape
+ })
+
+ g.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
new file mode 100644
index 0000000..f4e4dbd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream._
+import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+object Test15 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test15", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ import akka.stream.scaladsl.GraphDSL.Implicits._
+ RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
+ val A = builder.add(Source.single(0)).out
+ val B = builder.add(Broadcast[Int](2))
+ val C = builder.add(Merge[Int](2).named("C"))
+ val D = builder.add(Flow[Int].map(_ + 1).named("D"))
+ val E = builder.add(Balance[Int](2).named("E"))
+ val F = builder.add(Merge[Int](2).named("F"))
+ val G = builder.add(Sink.foreach(println).named("G")).in
+
+ C <~ F
+ A ~> B ~> C ~> F
+ B ~> D ~> E ~> F
+ E ~> G
+
+ ClosedShape
+ }).run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
+
+
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
new file mode 100644
index 0000000..9691496
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, LoggerSink}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * All remote
+ */
+object Test16 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test16", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ val sink = GearSink.to(new LoggerSink[String])
+ val sourceData = new CollectionDataSource(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+ val source = GearSource.from[String](sourceData)
+ source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
new file mode 100644
index 0000000..a6049cd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl._
+import akka.stream.{ActorMaterializer, ClosedShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource}
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ *
+ * This tests how different Materializers can be used together in an explicit way.
+ *
+ */
+object Test2 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test2", akkaConf)
+ val gearpumpMaterializer = GearpumpMaterializer()
+
+ val echo = system.actorOf(Props(new Echo()))
+ val source = GearSource.bridge[String, String]
+ val sink = GearSink.bridge[String, String]
+
+ val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _)
+ val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer)
+
+ val actorMaterializer = ActorMaterializer()
+
+ val externalSource = Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ )
+ val externalSink = Sink.actorRef(echo, "COMPLETE")
+
+ RunnableGraph.fromGraph(
+ GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ externalSource ~> Sink.fromSubscriber(entry)
+ Source.fromPublisher(exit) ~> externalSink
+ ClosedShape
+ }
+ ).run()(actorMaterializer)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
new file mode 100644
index 0000000..24faeb3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import akka.stream.scaladsl.Sink
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from remote and write to local
+ */
+object Test3 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test3", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ val echo = system.actorOf(Props(new Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+ val sourceData = new CollectionDataSource(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky"))
+ val source = GearSource.from[String](sourceData)
+ source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
new file mode 100644
index 0000000..6a44a35
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Source
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSink
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.streaming.dsl.scalaapi.LoggerSink
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * read from local and write to remote
+ */
+object Test4 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test4", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ Source(
+ List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")
+ ).filter(_.startsWith("red")).
+ map("I want to order item: " + _).
+ runWith(GearSink.to(new LoggerSink[String]))
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
new file mode 100644
index 0000000..ad87a97
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.ClosedShape
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * test fanout
+ */
+object Test5 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test5", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+
+ val echo = system.actorOf(Props(new Echo()))
+ val source = Source(List(("male", "24"), ("female", "23")))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+
+ RunnableGraph.fromGraph(
+ GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val unzip = b.add(Unzip[String, String]())
+ val sink1 = Sink.actorRef(echo, "COMPLETE")
+ val sink2 = Sink.actorRef(echo, "COMPLETE")
+ source ~> unzip.in
+ unzip.out0 ~> sink1
+ unzip.out1 ~> sink1
+ ClosedShape
+ }
+ ).run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class Echo extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
new file mode 100644
index 0000000..a525471
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.scaladsl.Sink
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.akkastream.scaladsl.GearSource
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * WordCount example
+ * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet)
+ */
+
+import org.apache.gearpump.akkastream.scaladsl.Implicits._
+
+object Test6 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test6", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ val echo = system.actorOf(Props(Echo()))
+ val sink = Sink.actorRef(echo, "COMPLETE")
+ val sourceData = new CollectionDataSource(
+ List(
+ "this is a good start",
+ "this is a good time",
+ "time to start",
+ "congratulations",
+ "green plant",
+ "blue sky")
+ )
+ val source = GearSource.from[String](sourceData)
+ source.mapConcat({line =>
+ line.split(" ").toList
+ }).groupBy2(x => x)
+ .map(word => (word, 1))
+ .reduce({(a, b) =>
+ (a._1, a._2 + b._2)
+ })
+ .log("word-count")
+ .runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ case class Echo() extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
new file mode 100644
index 0000000..8c837af
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source}
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.ArgumentsParser
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+
+/**
+ * This is a simplified API you can use to combine sources and sinks
+ * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A]
+ * without the need for using the Graph DSL
+ */
+
+object Test7 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val system = ActorSystem("Test7", akkaConf)
+ implicit val materializer = GearpumpMaterializer()
+ implicit val ec = system.dispatcher
+
+ val sourceA = Source(List(1))
+ val sourceB = Source(List(2))
+ val mergedSource = Source.combine(sourceA, sourceB)(Merge(_))
+
+ val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x"))
+ val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x"))
+ val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_))
+ mergedSource.runWith(sink)
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
new file mode 100644
index 0000000..ad2ac61
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+/**
+ * Stream example to find sum of elements
+ */
+object Test8 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test8", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ implicit val ec = system.dispatcher
+
+ // Source gives 1 to 100 elements
+ val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100))
+ val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
+
+ val result: Future[Int] = source.runWith(sink)
+ result.map(sum => {
+ println(s"Sum of stream elements => $sum")
+ })
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
new file mode 100644
index 0000000..66414e0
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.example
+
+import akka.NotUsed
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape}
+import akka.stream.scaladsl._
+import org.apache.gearpump.akkastream.GearpumpMaterializer
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.util.AkkaApp
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+/**
+ * Stream example showing Broadcast
+ */
+object Test9 extends AkkaApp with ArgumentsParser {
+ // scalastyle:off println
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+ implicit val system = ActorSystem("Test9", akkaConf)
+ implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match {
+ case true =>
+ GearpumpMaterializer()
+ case false =>
+ ActorMaterializer(
+ ActorMaterializerSettings(system).withAutoFusing(false)
+ )
+ }
+ implicit val ec = system.dispatcher
+
+ val sinkActor = system.actorOf(Props(new SinkActor()))
+ val source = Source((1 to 5))
+ val sink = Sink.actorRef(sinkActor, "COMPLETE")
+ val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map {
+ x => println(s"processing broadcasted element : $x in flowA"); x
+ }
+ val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map {
+ x => println(s"processing broadcasted element : $x in flowB"); x
+ }
+
+ val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[Int](2))
+ val merge = b.add(Merge[Int](2))
+ source ~> broadcast
+ broadcast ~> flowA ~> merge
+ broadcast ~> flowB ~> merge
+ merge ~> sink
+ ClosedShape
+ })
+
+ graph.run()
+
+ Await.result(system.whenTerminated, 60.minutes)
+ }
+
+ class SinkActor extends Actor {
+ def receive: Receive = {
+ case any: AnyRef =>
+ println("Confirm received: " + any)
+ }
+ }
+ // scalastyle:on println
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
similarity index 60%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
index 56b89bc..2a1e7ff 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala
@@ -16,30 +16,34 @@
* limitations under the License.
*/
-package akka.stream.gearpump.example
+package org.apache.gearpump.akkastream.example
import java.io.{File, FileInputStream}
import java.util.zip.GZIPInputStream
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
+import akka.NotUsed
import akka.actor.ActorSystem
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer}
import akka.stream.scaladsl._
+import akka.stream.{ClosedShape, IOResult}
import akka.util.ByteString
-import org.json4s.JsonAST.JString
-
+import org.apache.gearpump.akkastream.graph.GraphPartitioner
+import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer}
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.util.AkkaApp
+import org.json4s.JsonAST.JString
+import org.json4s.jackson.JsonMethods
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
/**
* this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
* which showcases running Akka Streams DSL across JVMs on Gearpump
*
- * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
- * -input wikidata-${DATE}-all.json.gz -languages en,de
+ * Usage: output/target/pack/bin/gear app
+ * -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
+ * -input wikidata-${DATE}-all.json.gz -languages en,de
*
* (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/)
*
@@ -58,54 +62,58 @@
val input = new File(parsed.getString("input"))
val langs = parsed.getString("languages").split(",")
- implicit val system = ActorSystem("wikidata-poc", akkaConf)
- implicit val materializer = new GearpumpMaterializer(system, GraphCutter.TagAttributeStrategy, useLocalCluster = false)
+ implicit val system = ActorSystem("WikipediaApp", akkaConf)
+ implicit val materializer =
+ GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy)
import system.dispatcher
val elements = source(input).via(parseJson(langs))
- val g = FlowGraph.closed(count) { implicit b =>
- sinkCount => {
-
- val broadcast = b.add(Broadcast[WikidataElement](2))
- elements ~> broadcast ~> logEveryNSink(1000)
- broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+ val g = RunnableGraph.fromGraph(
+ GraphDSL.create(count) { implicit b =>
+ sinkCount => {
+ import GraphDSL.Implicits._
+ val broadcast = b.add(Broadcast[WikidataElement](2))
+ elements ~> broadcast ~> logEveryNSink(1000)
+ broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+ ClosedShape
+ }
}
- }
+ )
- g.run().onComplete { x =>
- x match {
- case Success((t, f)) => printResults(t, f)
- case Failure(tr) => println("Something went wrong")
- }
- system.terminate()
+ g.run().onComplete {
+ case Success((t, f)) => printResults(t, f)
+ // scalastyle:off println
+ case Failure(tr) => println("Something went wrong")
+ // scalastyle:on println
}
- Await.result(system.whenTerminated, Duration.Inf)
+ Await.result(system.whenTerminated, 60.minutes)
}
- def source(file: File): Source[String, Future[Long]] = {
+ def source(file: File): Source[String, Future[IOResult]] = {
val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
- InputStreamSource(() => compressed)
+ StreamConverters.fromInputStream(() => compressed)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
.map(x => x.decodeString("utf-8"))
}
- def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): Flow[String, WikidataElement, Unit] =
- Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect {
+ def parseJson(langs: Seq[String])(implicit ec: ExecutionContext):
+ Flow[String, WikidataElement, NotUsed] =
+ Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({
case Some(v) => v
- }
+ })
def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
- import org.json4s.jackson.JsonMethods
Try(JsonMethods.parse(line)).toOption.flatMap { json =>
json \ "id" match {
case JString(itemId) =>
- val sites = for {
+
+ val sites: Seq[(String, String)] = for {
lang <- langs
JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
} yield lang -> title
- if (sites.isEmpty) None
+ if(sites.isEmpty) None
else Some(WikidataElement(id = itemId, sites = sites.toMap))
case _ => None
@@ -113,17 +121,21 @@
}
}
- def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) =>
- if (x % n == 0)
+ def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) =>
+ if (x % n == 0) {
+ // scalastyle:off println
println(s"Processing element $x: $y")
+ // scalastyle:on println
+ }
x + 1
}
- def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, Unit] = Flow[WikidataElement]
+ def checkSameTitles(langs: Set[String]):
+ Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement]
.filter(_.sites.keySet == langs)
.map { x =>
val titles = x.sites.values
- titles.forall(_ == titles.head)
+ titles.forall( _ == titles.head)
}.withAttributes(GearAttributes.remote)
def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
@@ -131,13 +143,15 @@
case ((t, f), false) => (t, f + 1)
}
- def printResults(t: Int, f: Int) = {
- val message =
- s"""
- | Number of items with the same title: $t
- | Number of items with the different title: $f
- | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
- """.stripMargin
+ def printResults(t: Int, f: Int): Unit = {
+ val message = s"""
+ | Number of items with the same title: $t
+ | Number of items with the different title: $f
+ | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+ """.stripMargin
+ // scalastyle:off println
println(message)
+ // scalastyle:on println
}
+
}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
similarity index 65%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
index 19083f6..f7919c0 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -16,23 +16,23 @@
* limitations under the License.
*/
-package akka.stream.gearpump.graph
+package org.apache.gearpump.akkastream.graph
-import akka.stream.ModuleGraph
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.GearAttributes.{Local, Location, Remote}
-import akka.stream.gearpump.graph.GraphCutter.Strategy
-import akka.stream.gearpump.module.{BridgeModule, DummyModule, GearpumpTaskModule, GroupByModule, SinkBridgeModule, SourceBridgeModule}
-import akka.stream.impl.Stages.DirectProcessor
-import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module}
+import akka.stream.{Shape, SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
+import org.apache.gearpump.akkastream.module._
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.fusing.GraphStageModule
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource}
import akka.stream.impl.{SinkModule, SourceModule}
-
import org.apache.gearpump.util.Graph
/**
*
- * GraphCutter is used to decide which part is rendered locally
+ * GraphPartitioner is used to decide which part will be rendered locally
* and which part should be rendered remotely.
*
* We will cut the graph based on the [[Strategy]] provided.
@@ -54,23 +54,22 @@
* \| /
* AtomicModule3
*
- *
- * @see [[ModuleGraph]] for more information of how Graph is organized.
+ * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized.
*
*/
-class GraphCutter(strategy: Strategy) {
- def cut(moduleGraph: ModuleGraph[_]): List[SubGraph] = {
- val graph = removeDummyModule(moduleGraph.graph)
+class GraphPartitioner(strategy: Strategy) {
+ def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = {
+ val graph = removeDummyModule(moduleGraph)
val tags = tag(graph, strategy)
- doCut(graph, tags, moduleGraph.mat)
+ doPartition(graph, tags)
}
- private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location],
- mat: MaterializedValueNode): List[SubGraph] = {
+ private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]):
+ List[SubGraph] = {
val local = Graph.empty[Module, Edge]
val remote = Graph.empty[Module, Edge]
- graph.vertices.foreach { module =>
+ graph.vertices.foreach{ module =>
if (tags(module) == Local) {
local.addVertex(module)
} else {
@@ -78,7 +77,7 @@
}
}
- graph.edges.foreach { nodeEdgeNode =>
+ graph.edges.foreach{ nodeEdgeNode =>
val (node1, edge, node2) = nodeEdgeNode
(tags(node1), tags(node2)) match {
case (Local, Local) =>
@@ -90,7 +89,7 @@
case bridge: BridgeModule[_, _, _] =>
local.addEdge(node1, edge, node2)
case _ =>
- // Creates a bridge module in between
+ // create a bridge module in between
val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
val remoteEdge = Edge(bridge.outPort, edge.to)
remote.addEdge(bridge, remoteEdge, node2)
@@ -102,7 +101,7 @@
case bridge: BridgeModule[_, _, _] =>
local.addEdge(node1, edge, node2)
case _ =>
- // Creates a bridge module in between
+ // create a bridge module in between
val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
val remoteEdge = Edge(edge.from, bridge.inPort)
remote.addEdge(node1, remoteEdge, bridge)
@@ -116,14 +115,14 @@
}
private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
- graph.vertices.map { vertex =>
+ graph.vertices.map{vertex =>
vertex -> strategy.apply(vertex)
}.toMap
}
private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
val graph = inputGraph.copy
- val dummies = graph.vertices.filter { module =>
+ val dummies = graph.vertices.filter {module =>
module match {
case dummy: DummyModule =>
true
@@ -136,7 +135,7 @@
}
}
-object GraphCutter {
+object GraphPartitioner {
type Strategy = PartialFunction[Module, Location]
@@ -146,22 +145,33 @@
case task: GearpumpTaskModule =>
Remote
case groupBy: GroupByModule[_, _] =>
- // TODO: groupBy is not supported by local materializer yet
+ // TODO: groupBy is not supported by local materializer
Remote
case source: SourceModule[_, _] =>
Local
case sink: SinkModule[_, _] =>
Local
- case matValueSource: MaterializedValueSource[_] =>
- Local
- case direct: DirectProcessor =>
- Local
- case time: TimerTransform =>
- // Renders to local as it requires a timer.
- Local
+ case remaining: Module =>
+ remaining.shape match {
+ case sourceShape: SourceShape[_] =>
+ Local
+ case sinkShape: SinkShape[_] =>
+ Local
+ case otherShapes: Shape =>
+ Remote
+ }
}
val AllRemoteStrategy: Strategy = BaseStrategy orElse {
+ case graphStageModule: GraphStageModule =>
+ graphStageModule.stage match {
+ case matValueSource: MaterializedValueSource[_] =>
+ Local
+ case singleSource: SingleSource[_] =>
+ Local
+ case _ =>
+ Remote
+ }
case _ =>
Remote
}
@@ -177,7 +187,19 @@
}
val AllLocalStrategy: Strategy = BaseStrategy orElse {
+ case graphStageModule: GraphStageModule =>
+ // TODO kasravi review
+ graphStageModule.stage match {
+ case matValueSource: MaterializedValueSource[_] =>
+ Local
+ case _ =>
+ Local
+ }
case _ =>
Local
}
-}
\ No newline at end of file
+
+ def apply(strategy: Strategy): GraphPartitioner = {
+ new GraphPartitioner(strategy)
+ }
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
new file mode 100644
index 0000000..fe86951
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.graph
+
+import akka.actor.ActorSystem
+import akka.stream.impl.Stages.DefaultAttributes
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.{PublisherSource, SubscriberSink}
+import akka.stream.{SinkShape, SourceShape}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.util.Graph
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
+ * contain module that can be materialized in local JVM.
+ *
+ * @param graph Graph[Module, Edge]
+ */
+class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
+
+object LocalGraph {
+
+ /**
+ * materialize LocalGraph in local JVM
+ * @param system ActorSystem
+ */
+ class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer {
+
+ // create a local materializer
+ val materializer = LocalMaterializerImpl()(system)
+
+ /**
+ *
+ * @param matValues Materialized Values for each module before materialization
+ * @return Materialized Values for each Module after the materialization.
+ */
+ override def materialize(graph: SubGraph,
+ matValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
+ val newGraph: Graph[Module, Edge] = graph.graph.mapVertex {
+ case source: SourceBridgeModule[in, out] =>
+ val subscriber = matValues(source).asInstanceOf[Subscriber[in]]
+ val shape: SinkShape[in] = SinkShape(source.inPort)
+ new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape)
+ case sink: SinkBridgeModule[in, out] =>
+ val publisher = matValues(sink).asInstanceOf[Publisher[out]]
+ val shape: SourceShape[out] = SourceShape(sink.outPort)
+ new PublisherSource(publisher, DefaultAttributes.publisherSource, shape)
+ case other =>
+ other
+ }
+ materializer.materialize(newGraph, matValues)
+ }
+
+ override def shutdown: Unit = {
+ materializer.shutdown()
+ }
+ }
+}
+
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
similarity index 64%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
index 3cea78a..99ebe17 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-package akka.stream.gearpump.graph
+package org.apache.gearpump.akkastream.graph
import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.materializer.RemoteMaterializerImpl
-import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule}
-import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient
-import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl
+import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
+import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
import akka.stream.impl.StreamLayout.Module
-
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.embedded.EmbeddedCluster
import org.apache.gearpump.streaming.ProcessorId
@@ -34,9 +33,9 @@
/**
*
* [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- * contain modules that can be materialized in remote Gearpump cluster.
+ * contain modules that can be materialized in remote Gearpump cluster.
*
- * @param graph
+ * @param graph Graph
*/
class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph
@@ -44,10 +43,11 @@
/**
* * materialize LocalGraph in remote gearpump cluster
- * @param useInProcessCluster
- * @param system
+ * @param useInProcessCluster Boolean
+ * @param system ActorSystem
*/
- class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer {
+ class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
+ extends SubGraphMaterializer {
private val local = if (useInProcessCluster) {
val cluster = EmbeddedCluster()
cluster.start()
@@ -57,13 +57,15 @@
}
private val context: ClientContext = local match {
- case Some(local) => local.newClientContext
+ case Some(l) => l.newClientContext
case None => ClientContext(system)
}
- override def materialize(subGraph: SubGraph, inputMatValues: Map[Module, Any]): Map[Module, Any] = {
+ override def materialize(subGraph: SubGraph,
+ inputMatValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
val graph = subGraph.graph
-
+
if (graph.isEmpty) {
inputMatValues
} else {
@@ -71,22 +73,27 @@
}
}
- private def doMaterialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = {
+ private def doMaterialize(graph: Graph[Module, Edge],
+ inputMatValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
val materializer = new RemoteMaterializerImpl(graph, system)
val (app, matValues) = materializer.materialize
- val appId = context.submit(app)
- println("sleep 5 second until the applicaiton is ready on cluster")
+ val appId = context.submit(app).appId
+ // scalastyle:off println
+ println("sleep 5 second until the application is ready on cluster")
+ // scalastyle:on println
Thread.sleep(5000)
def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = {
matValues.toList.flatMap { kv =>
val (module, processorId) = kv
module match {
- case source: SourceBridgeModule[AnyRef, AnyRef] =>
- val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher, context, appId, processorId)
+ case source: SourceBridgeModule[_, _] =>
+ val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher,
+ context, appId, processorId)
Some((module, bridge))
- case sink: SinkBridgeModule[AnyRef, AnyRef] =>
+ case sink: SinkBridgeModule[_, _] =>
val bridge = new SinkBridgeTaskClient(system, context, appId, processorId)
Some((module, bridge))
case other =>
@@ -98,9 +105,9 @@
inputMatValues ++ resolve(matValues)
}
- override def shutdown(): Unit = {
+ override def shutdown: Unit = {
context.close()
- local.map(_.stop())
+ local.foreach(_.stop())
}
}
}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
similarity index 77%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
index 564b6c7..a74143e 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package akka.stream.gearpump.graph
+package org.apache.gearpump.akkastream.graph
-import akka.stream.ModuleGraph.Edge
+import akka.actor.ActorSystem
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
import akka.stream.impl.StreamLayout.Module
-
import org.apache.gearpump.util.Graph
/**
- * [[SubGraph]] is a partial part of [[akka.stream.ModuleGraph]]
+ * [[SubGraph]] is a partial DAG
*
- * The idea is that by dividing [[akka.stream.ModuleGraph]] to several
+ * The idea is that by dividing [[Graph]] to several
* [[SubGraph]], we can materialize each [[SubGraph]] with different
* materializer.
*/
@@ -40,6 +40,7 @@
def graph: Graph[Module, Edge]
}
+
/**
* Materializer for Sub-Graph type
*/
@@ -50,7 +51,9 @@
* @return Materialized Values for each Module after the materialization.
*/
- def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any]
+ def materialize(graph: SubGraph,
+ matValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any]
- def shutdown(): Unit
-}
\ No newline at end of file
+ def shutdown: Unit
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
new file mode 100644
index 0000000..477f4d3
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.{util => ju}
+
+import org.apache.gearpump.util.{Graph => GGraph}
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill}
+import akka.dispatch.Dispatchers
+import akka.event.{Logging, LoggingAdapter}
+import akka.stream.impl.StreamLayout._
+import akka.stream.impl._
+import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
+import akka.stream.impl.fusing.{ActorGraphInterpreter, Fold, GraphInterpreterShell, GraphModule, GraphStageModule}
+import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
+import akka.stream.scaladsl.ModuleExtractor
+import akka.stream.{ClosedShape, Graph => AkkaGraph, _}
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.ReduceModule
+import org.apache.gearpump.akkastream.util.MaterializedValueOps
+import org.reactivestreams.{Publisher, Subscriber}
+
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
+ *
+ * @param system System
+ * @param settings ActorMaterializerSettings
+ * @param dispatchers Dispatchers
+ * @param supervisor ActorRef
+ * @param haveShutDown AtomicBoolean
+ * @param flowNames SeqActorName
+ */
+case class LocalMaterializerImpl (
+ override val system: ActorSystem,
+ override val settings: ActorMaterializerSettings,
+ dispatchers: Dispatchers,
+ override val supervisor: ActorRef,
+ haveShutDown: AtomicBoolean,
+ flowNames: SeqActorName)
+ extends ExtendedActorMaterializer {
+
+ override def logger: LoggingAdapter = Logging.getLogger(system, this)
+
+ override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration,
+ task: Runnable): Cancellable =
+ system.scheduler.schedule(initialDelay, interval, task)(executionContext)
+
+ override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
+ system.scheduler.scheduleOnce(delay, task)(executionContext)
+
+ override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
+ import ActorAttributes._
+ import Attributes._
+ opAttr.attributeList.foldLeft(settings) { (s, attr) =>
+ attr match {
+ case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
+ case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
+ case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
+ case l: LogLevels => s
+ case Name(_) => s
+ case other => s
+ }
+ }
+ }
+
+ override def shutdown(): Unit =
+ if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
+
+ override def isShutdown: Boolean = haveShutDown.get()
+
+ override lazy val executionContext: ExecutionContextExecutor =
+ dispatchers.lookup(settings.dispatcher match {
+ case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
+ case other => other
+ })
+
+
+ case class LocalMaterializerSession(module: Module, iAttributes: Attributes,
+ subflowFuser: GraphInterpreterShell => ActorRef = null)
+ extends MaterializerSession(module, iAttributes) {
+
+ override def materializeAtomic(atomic: AtomicModule,
+ effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
+
+ def newMaterializationContext() =
+ new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes,
+ stageName(effectiveAttributes))
+ atomic match {
+ case sink: SinkModule[_, _] =>
+ val (sub, mat) = sink.create(newMaterializationContext())
+ assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]])
+ matVal.put(atomic, mat)
+ case source: SourceModule[_, _] =>
+ val (pub, mat) = source.create(newMaterializationContext())
+ assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
+ matVal.put(atomic, mat)
+ case stage: ProcessorModule[_, _, _] =>
+ val (processor, mat) = stage.createProcessor()
+ assignPort(stage.inPort, processor)
+ assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
+ matVal.put(atomic, mat)
+ // FIXME
+ // case tls: TlsModule =>
+ // TODO solve this so TlsModule doesn't need special treatment here
+ // val es = effectiveSettings(effectiveAttributes)
+ // val props =
+ // TLSActor.props(es, tls.sslContext, tls.sslConfig,
+ // tls.firstSession, tls.role, tls.closing, tls.hostInfo)
+ // val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
+ // def factory(id: Int) = new ActorPublisher[Any](impl) {
+ // override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
+ // }
+ // val publishers = Vector.tabulate(2)(factory)
+ // impl ! FanOut.ExposedPublishers(publishers)
+ //
+ // assignPort(tls.plainOut, publishers(TLSActor.UserOut))
+ // assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
+ //
+ // assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
+ // assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn))
+ //
+ // matVal.put(atomic, NotUsed)
+ case graph: GraphModule =>
+ matGraph(graph, effectiveAttributes, matVal)
+ case stage: GraphStageModule =>
+ val graph =
+ GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage),
+ stage.shape, stage.attributes, Array(stage))
+ matGraph(graph, effectiveAttributes, matVal)
+ }
+ }
+
+ private def matGraph(graph: GraphModule, effectiveAttributes: Attributes,
+ matVal: ju.Map[Module, Any]): Unit = {
+ val calculatedSettings = effectiveSettings(effectiveAttributes)
+ val (handlers, logics) =
+ graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
+
+ val shell = new GraphInterpreterShell(graph.assembly, handlers,
+ logics, graph.shape, calculatedSettings, LocalMaterializerImpl.this)
+
+ val impl =
+ if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) {
+ subflowFuser(shell)
+ } else {
+ val props = ActorGraphInterpreter.props(shell)
+ actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
+ }
+
+ for ((inlet, i) <- graph.shape.inlets.iterator.zipWithIndex) {
+ val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i)
+ assignPort(inlet, subscriber)
+ }
+ for ((outlet, i) <- graph.shape.outlets.iterator.zipWithIndex) {
+ val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i)
+ impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher)
+ assignPort(outlet, publisher)
+ }
+ }
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = {
+
+ LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+ null, null).materialize().asInstanceOf[Mat]
+
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+ initialAttributes: Attributes): Mat = {
+ materialize(runnableGraph)
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+ subflowFuser: GraphInterpreterShell => ActorRef): Mat = {
+
+ LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get,
+ null, null).materialize().asInstanceOf[Mat]
+
+ }
+
+ override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat],
+ subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
+ materialize(runnableGraph)
+ }
+
+ override def makeLogger(logSource: Class[_]): LoggingAdapter = {
+ logger
+ }
+
+ def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
+ var moduleInProgress: Module = EmptyModule
+ graph.vertices.foreach(module => {
+ moduleInProgress = moduleInProgress.compose(module)
+ })
+ graph.edges.foreach(value => {
+ val (node1, edge, node2) = value
+ moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
+ })
+
+ moduleInProgress
+ }
+
+ def materialize(graph: GGraph[Module, Edge],
+ inputMatValues: scala.collection.mutable.Map[Module, Any]):
+ scala.collection.mutable.Map[Module, Any] = {
+ val topLevelModule = buildToplevelModule(graph)
+ val session = LocalMaterializerSession(topLevelModule, null, null)
+ import scala.collection.JavaConverters._
+ val matV = inputMatValues.asJava
+ val materializedGraph = graph.mapVertex { module =>
+ session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV)
+ matV.get(module)
+ }
+ materializedGraph.edges.foreach { nodeEdgeNode =>
+ val (node1, edge, node2) = nodeEdgeNode
+ val from = edge.from
+ val to = edge.to
+ node1 match {
+ case module1: Module =>
+ node2 match {
+ case module2: Module =>
+ val publisher = module1.downstreams(from).asInstanceOf[Publisher[Any]]
+ val subscriber = module2.upstreams(to).asInstanceOf[Subscriber[Any]]
+ publisher.subscribe(subscriber)
+ case _ =>
+ }
+ case _ =>
+ }
+ }
+ val matValSources = graph.vertices.flatMap(module => {
+ val rt: Option[MaterializedValueSource[_]] = module match {
+ case graphStage: GraphStageModule =>
+ graphStage.stage match {
+ case materializedValueSource: MaterializedValueSource[_] =>
+ Some(materializedValueSource)
+ case _ =>
+ None
+ }
+ case _ =>
+ None
+ }
+ rt
+ })
+ publishToMaterializedValueSource(matValSources, inputMatValues)
+ inputMatValues
+ }
+
+ private def publishToMaterializedValueSource(modules: List[MaterializedValueSource[_]],
+ matValues: scala.collection.mutable.Map[Module, Any]): Unit = {
+ modules.foreach { source =>
+ Option(source.computation).map { attr =>
+ val valueToPublish = MaterializedValueOps(attr).resolve(matValues)
+ source.setValue(valueToPublish)
+ }
+ }
+ }
+
+ private[this] def createFlowName(): String = flowNames.next()
+
+ val flowName = createFlowName()
+ var nextId = 0
+
+ def stageName(attr: Attributes): String = {
+ val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
+ nextId += 1
+ name
+ }
+
+ override def withNamePrefix(name: String): LocalMaterializerImpl =
+ this.copy(flowNames = flowNames.copy(name))
+
+}
+
+object LocalMaterializerImpl {
+ case class MaterializedModule(module: Module, matValue: Any,
+ inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]],
+ outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
+
+ def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
+ namePrefix: Option[String] = None)(implicit system: ActorSystem):
+ LocalMaterializerImpl = {
+
+ val settings = materializerSettings getOrElse ActorMaterializerSettings(system)
+ apply(settings, namePrefix.getOrElse("flow"))(system)
+ }
+
+ def apply(materializerSettings: ActorMaterializerSettings,
+ namePrefix: String)(implicit system: ActorSystem): LocalMaterializerImpl = {
+ val haveShutDown = new AtomicBoolean(false)
+
+ new LocalMaterializerImpl(
+ system,
+ materializerSettings,
+ system.dispatchers,
+ system.actorOf(StreamSupervisor.props(materializerSettings,
+ haveShutDown).withDispatcher(materializerSettings.dispatcher)),
+ haveShutDown,
+ FlowNames(system).name.copy(namePrefix))
+ }
+
+ def toFoldModule(reduce: ReduceModule[Any]): Fold[Any, Any] = {
+ val f = reduce.f
+ val aggregator = {(zero: Any, input: Any) =>
+ if (zero == null) {
+ input
+ } else {
+ f(zero, input)
+ }
+ }
+ Fold(null, aggregator)
+ }
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
new file mode 100644
index 0000000..e065c90
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import akka.actor.ActorSystem
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.Timers.{Completion, DelayInitial, Idle, IdleInject, IdleTimeoutBidi, Initial}
+import akka.stream.impl.fusing.{Batch, Collect, Delay, Drop, DropWhile, DropWithin, Filter, FlattenMerge, Fold, GraphStageModule, GroupBy, GroupedWithin, Intersperse, LimitWeighted, Log, MapAsync, MapAsyncUnordered, PrefixAndTail, Recover, Reduce, Scan, Split, StatefulMapConcat, SubSink, SubSource, Take, TakeWhile, TakeWithin, Map => FMap}
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource, TickSource}
+import akka.stream.impl.io.IncomingConnectionStage
+import akka.stream.impl.{HeadOptionStage, Stages, Throttle, Unfold, UnfoldAsync}
+import akka.stream.scaladsl.{Balance, Broadcast, Concat, Interleave, Merge, MergePreferred, MergeSorted, ModuleExtractor, Unzip, Zip, ZipWith2}
+import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
+import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, BroadcastTask, ConcatTask, DelayInitialTask, DropWithinTask, FlattenMergeTask, FoldTask, GraphTask, GroupedWithinTask, InterleaveTask, MapAsyncTask, MergeTask, SingleSourceTask, SinkBridgeTask, SourceBridgeTask, StatefulMapConcatTask, TakeWithinTask, ThrottleTask, TickSourceTask, Zip2Task}
+import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, INTERVAL, TICK}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper
+import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Promise
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump
+ * Streaming Application.
+ *
+ * @param graph Graph
+ * @param system ActorSystem
+ */
+class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
+
+ import RemoteMaterializerImpl._
+
+ type ID = String
+ private implicit val actorSystem = system
+
+ private def uuid: String = {
+ java.util.UUID.randomUUID.toString
+ }
+
+ def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
+ val (opGraph, ids) = toOpGraph
+ val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph)
+ val processorIds = resolveIds(app, ids)
+
+ val updatedApp = updateJunctionConfig(processorIds, app)
+ (removeIds(updatedApp), processorIds)
+ }
+
+ private def updateJunctionConfig(processorIds: Map[Module, ProcessorId],
+ app: StreamApplication): StreamApplication = {
+ val config = junctionConfig(processorIds)
+
+ val dag = app.dag.mapVertex { vertex =>
+ val processorId = vertex.id
+ val newConf = vertex.taskConf.withConfig(config(processorId))
+ vertex.copy(taskConf = newConf)
+ }
+ new StreamApplication(app.name, app.inputUserConfig, dag)
+ }
+
+ private def junctionConfig(processorIds: Map[Module, ProcessorId]):
+ Map[ProcessorId, UserConfig] = {
+ val updatedConfigs = graph.vertices.flatMap { vertex =>
+ buildShape(vertex, processorIds)
+ }.toMap
+ updatedConfigs
+ }
+
+ private def buildShape(vertex: Module, processorIds: Map[Module, ProcessorId]):
+ Option[(ProcessorId, UserConfig)] = {
+ def inProcessors(vertex: Module): List[ProcessorId] = {
+ vertex.shape.inlets.flatMap { inlet =>
+ graph.incomingEdgesOf(vertex).find(
+ _._2.to == inlet).map(_._1
+ ).flatMap(processorIds.get)
+ }.toList
+ }
+ def outProcessors(vertex: Module): List[ProcessorId] = {
+ vertex.shape.outlets.flatMap { outlet =>
+ graph.outgoingEdgesOf(vertex).find(
+ _._2.from == outlet).map(_._3
+ ).flatMap(processorIds.get)
+ }.toList
+ }
+ processorIds.get(vertex).map(processorId => {
+ (processorId, UserConfig.empty.
+ withValue(GraphTask.OUT_PROCESSORS, outProcessors(vertex)).
+ withValue(GraphTask.IN_PROCESSORS, inProcessors(vertex)))
+ })
+ }
+
+ private def resolveIds(app: StreamApplication, ids: Map[Module, ID]):
+ Map[Module, ProcessorId] = {
+ ids.flatMap { kv =>
+ val (module, id) = kv
+ val processorId = app.dag.vertices.find { processor =>
+ processor.taskConf.getString(id).isDefined
+ }.map(_.id)
+ processorId.map((module, _))
+ }
+ }
+
+ private def removeIds(app: StreamApplication): StreamApplication = {
+ val graph = app.dag.mapVertex { processor =>
+ val conf = removeId(processor.taskConf)
+ processor.copy(taskConf = conf)
+ }
+ new StreamApplication(app.name, app.inputUserConfig, graph)
+ }
+
+ private def removeId(conf: UserConfig): UserConfig = {
+ conf.filter { kv =>
+ kv._2 != RemoteMaterializerImpl.TRACKABLE
+ }
+ }
+
+ private def toOpGraph: (Graph[Op, OpEdge], Map[Module, ID]) = {
+ var matValues = collection.mutable.Map.empty[Module, ID]
+ val opGraph = graph.mapVertex[Op] { module =>
+ val name = uuid
+ val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.TRACKABLE)
+ matValues += module -> name
+ val parallelism = GearAttributes.count(module.attributes)
+ val op = module match {
+ case source: SourceTaskModule[_] =>
+ val updatedConf = conf.withConfig(source.conf)
+ DataSourceOp(source.source, parallelism, updatedConf, "source")
+ case sink: SinkTaskModule[_] =>
+ val updatedConf = conf.withConfig(sink.conf)
+ DataSinkOp(sink.sink, parallelism, updatedConf, "sink")
+ case sourceBridge: SourceBridgeModule[_, _] =>
+ ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
+ case processor: ProcessorModule[_, _, _] =>
+ val updatedConf = conf.withConfig(processor.conf)
+ ProcessorOp(processor.processor, parallelism, updatedConf, "source")
+ case sinkBridge: SinkBridgeModule[_, _] =>
+ ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
+ case groupBy: GroupByModule[_, _] =>
+ GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindow.apply(1).accumulating),
+ parallelism, "groupBy", conf)
+ case reduce: ReduceModule[_] =>
+ reduceOp(reduce.f, conf)
+ case graphStage: GraphStageModule =>
+ translateGraphStageWithMaterializedValue(graphStage, parallelism, conf)
+ case _ =>
+ null
+ }
+ if (op == null) {
+ throw new UnsupportedOperationException(
+ module.getClass.toString + " is not supported with RemoteMaterializer"
+ )
+ }
+ op
+ }.mapEdge[OpEdge] { (n1, edge, n2) =>
+ n2 match {
+ case chainableOp: ChainableOp[_, _]
+ if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] =>
+ Direct
+ case _ =>
+ Shuffle
+ }
+ }
+ (opGraph, matValues.toMap)
+ }
+
+ private def translateGraphStageWithMaterializedValue(module: GraphStageModule,
+ parallelism: Int, conf: UserConfig): Op = {
+ module.stage match {
+ case tickSource: TickSource[_] =>
+ val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef]
+ val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, tickSource.initialDelay).
+ withValue[FiniteDuration](INTERVAL, tickSource.interval).
+ withValue(TICK, tick)
+ ProcessorOp(classOf[TickSourceTask[_]], parallelism, tiConf, "tickSource")
+ case graphStage: GraphStage[_] =>
+ translateGraphStage(module, parallelism, conf)
+ case headOptionStage: HeadOptionStage[_] =>
+ headOptionOp(headOptionStage, conf)
+ case pushPullGraphStageWithMaterializedValue:
+ PushPullGraphStageWithMaterializedValue[_, _, _, _] =>
+ translateSymbolic(pushPullGraphStageWithMaterializedValue, conf)
+ }
+ }
+
+ private def translateGraphStage(module: GraphStageModule,
+ parallelism: Int, conf: UserConfig): Op = {
+ module.stage match {
+ case balance: Balance[_] =>
+ ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
+ case batch: Batch[_, _] =>
+ val batchConf = conf.withValue[_ => Long](BatchTask.COST, batch.costFn).
+ withLong(BatchTask.MAX, batch.max).
+ withValue[(_, _) => _](BatchTask.AGGREGATE, batch.aggregate).
+ withValue[_ => _](BatchTask.SEED, batch.seed)
+ ProcessorOp(classOf[BatchTask[_, _]],
+ parallelism, batchConf, "batch")
+ case broadcast: Broadcast[_] =>
+ val name = ModuleExtractor.unapply(broadcast).map(_.attributes.nameOrDefault()).get
+ ProcessorOp(classOf[BroadcastTask], parallelism, conf, name)
+ case collect: Collect[_, _] =>
+ collectOp(collect.pf, conf)
+ case concat: Concat[_] =>
+ ProcessorOp(classOf[ConcatTask], parallelism, conf, "concat")
+ case delayInitial: DelayInitial[_] =>
+ val dIConf = conf.withValue[FiniteDuration](
+ DelayInitialTask.DELAY_INITIAL, delayInitial.delay)
+ ProcessorOp(classOf[DelayInitialTask[_]], parallelism, dIConf, "delayInitial")
+ case dropWhile: DropWhile[_] =>
+ dropWhileOp(dropWhile.p, conf)
+ case flattenMerge: FlattenMerge[_, _] =>
+ ProcessorOp(classOf[FlattenMergeTask], parallelism, conf, "flattenMerge")
+ case fold: Fold[_, _] =>
+ val foldConf = conf.withValue(FoldTask.ZERO, fold.zero.asInstanceOf[AnyRef]).
+ withValue(FoldTask.AGGREGATOR, fold.f)
+ ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
+ case groupBy: GroupBy[_, _] =>
+ GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindow.apply(1).accumulating),
+ groupBy.maxSubstreams, "groupBy", conf)
+ case groupedWithin: GroupedWithin[_] =>
+ val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
+ withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
+ ProcessorOp(classOf[GroupedWithinTask[_]], parallelism, diConf, "groupedWithin")
+ case idleInject: IdleInject[_, _] =>
+ // TODO
+ null
+ case idleTimeoutBidi: IdleTimeoutBidi[_, _] =>
+ // TODO
+ null
+ case incomingConnectionStage: IncomingConnectionStage =>
+ // TODO
+ null
+ case interleave: Interleave[_] =>
+ val ilConf = conf.withInt(InterleaveTask.INPUT_PORTS, interleave.inputPorts).
+ withInt(InterleaveTask.SEGMENT_SIZE, interleave.segmentSize)
+ ProcessorOp(classOf[InterleaveTask], parallelism, ilConf, "interleave")
+ null
+ case intersperse: Intersperse[_] =>
+ // TODO
+ null
+ case limitWeighted: LimitWeighted[_] =>
+ // TODO
+ null
+ case map: FMap[_, _] =>
+ mapOp(map.f, conf)
+ case mapAsync: MapAsync[_, _] =>
+ ProcessorOp(classOf[MapAsyncTask[_, _]],
+ mapAsync.parallelism, conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsync.f), "mapAsync")
+ case mapAsyncUnordered: MapAsyncUnordered[_, _] =>
+ ProcessorOp(classOf[MapAsyncTask[_, _]],
+ mapAsyncUnordered.parallelism,
+ conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsyncUnordered.f), "mapAsyncUnordered")
+ case materializedValueSource: MaterializedValueSource[_] =>
+ // TODO
+ null
+ case merge: Merge[_] =>
+ val mergeConf = conf.withBoolean(MergeTask.EAGER_COMPLETE, merge.eagerComplete).
+ withInt(MergeTask.INPUT_PORTS, merge.inputPorts)
+ ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge")
+ case mergePreferred: MergePreferred[_] =>
+ MergeOp("mergePreferred", conf)
+ case mergeSorted: MergeSorted[_] =>
+ MergeOp("mergeSorted", conf)
+ case prefixAndTail: PrefixAndTail[_] =>
+ // TODO
+ null
+ case recover: Recover[_] =>
+ // TODO
+ null
+ case scan: Scan[_, _] =>
+ scanOp(scan.zero, scan.f, conf)
+ case simpleLinearGraphStage: SimpleLinearGraphStage[_] =>
+ translateSimpleLinearGraph(simpleLinearGraphStage, parallelism, conf)
+ case singleSource: SingleSource[_] =>
+ val singleSourceConf = conf.withValue[AnyRef](SingleSourceTask.ELEMENT,
+ singleSource.elem.asInstanceOf[AnyRef])
+ ProcessorOp(classOf[SingleSourceTask[_]], parallelism, singleSourceConf, "singleSource")
+ case split: Split[_] =>
+ // TODO
+ null
+ case statefulMapConcat: StatefulMapConcat[_, _] =>
+ val func = statefulMapConcat.f
+ val statefulMapConf =
+ conf.withValue[() => _ => Iterable[_]](StatefulMapConcatTask.FUNC, func)
+ ProcessorOp(classOf[StatefulMapConcatTask[_, _]], parallelism,
+ statefulMapConf, "statefulMapConcat")
+ case subSink: SubSink[_] =>
+ // TODO
+ null
+ case subSource: SubSource[_] =>
+ // TODO
+ null
+ case unfold: Unfold[_, _] =>
+ // TODO
+ null
+ case unfoldAsync: UnfoldAsync[_, _] =>
+ // TODO
+ null
+ case unzip: Unzip[_, _] =>
+ // ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism,
+ // conf.withValue(
+ // Unzip2Task.UNZIP2_FUNCTION, Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
+ // TODO
+ null
+ case zip: Zip[_, _] =>
+ zipWithOp(zip.zipper, conf)
+ case zipWith2: ZipWith2[_, _, _] =>
+ ProcessorOp(classOf[Zip2Task[_, _, _]],
+ parallelism,
+ conf.withValue(
+ Zip2Task.ZIP2_FUNCTION, Zip2Task.ZipFunction(zipWith2.zipper)
+ ), "zipWith2")
+ }
+ }
+
+ private def translateSimpleLinearGraph(stage: SimpleLinearGraphStage[_],
+ parallelism: Int, conf: UserConfig): Op = {
+ stage match {
+ case completion: Completion[_] =>
+ // TODO
+ null
+ case delay: Delay[_] =>
+ // TODO
+ null
+ case drop: Drop[_] =>
+ dropOp(drop.count, conf)
+ case dropWithin: DropWithin[_] =>
+ val dropWithinConf =
+ conf.withValue[FiniteDuration](DropWithinTask.TIMEOUT, dropWithin.timeout)
+ ProcessorOp(classOf[DropWithinTask[_]],
+ parallelism, dropWithinConf, "dropWithin")
+ case filter: Filter[_] =>
+ filterOp(filter.p, conf)
+ case idle: Idle[_] =>
+ // TODO
+ null
+ case initial: Initial[_] =>
+ // TODO
+ null
+ case log: Log[_] =>
+ logOp(log.name, log.extract, conf)
+ case reduce: Reduce[_] =>
+ reduceOp(reduce.f, conf)
+ case take: Take[_] =>
+ takeOp(take.count, conf)
+ case takeWhile: TakeWhile[_] =>
+ filterOp(takeWhile.p, conf)
+ case takeWithin: TakeWithin[_] =>
+ val takeWithinConf =
+ conf.withValue[FiniteDuration](TakeWithinTask.TIMEOUT, takeWithin.timeout)
+ ProcessorOp(classOf[TakeWithinTask[_]],
+ parallelism, takeWithinConf, "takeWithin")
+ case throttle: Throttle[_] =>
+ val throttleConf = conf.withInt(ThrottleTask.COST, throttle.cost).
+ withInt(ThrottleTask.MAX_BURST, throttle.maximumBurst).
+ withValue[_ => Int](ThrottleTask.COST_CALC, throttle.costCalculation).
+ withValue[FiniteDuration](ThrottleTask.TIME_PERIOD, throttle.per)
+ ProcessorOp(classOf[ThrottleTask[_]],
+ parallelism, throttleConf, "throttle")
+ }
+ }
+
+ private def translateSymbolic(stage: PushPullGraphStageWithMaterializedValue[_, _, _, _],
+ conf: UserConfig): Op = {
+ stage match {
+ case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _]
+ if symbolicGraphStage.symbolicStage.attributes.equals(
+ Stages.DefaultAttributes.buffer) => {
+ // ignore the buffering operation
+ identity("buffer", conf)
+ }
+ }
+ }
+
+}
+
+object RemoteMaterializerImpl {
+ final val NotApplied: Any => Any = _ => NotApplied
+
+ def collectOp[In, Out](collect: PartialFunction[In, Out], conf: UserConfig): Op = {
+ flatMapOp({ data: In =>
+ collect.applyOrElse(data, NotApplied) match {
+ case NotApplied => None
+ case result: Any => Option(result)
+ }
+ }, "collect", conf)
+ }
+
+ def filterOp[In](filter: In => Boolean, conf: UserConfig): Op = {
+ flatMapOp({ data: In =>
+ if (filter(data)) Option(data) else None
+ }, "filter", conf)
+ }
+
+ def headOptionOp[T](headOptionStage: HeadOptionStage[T], conf: UserConfig): Op = {
+ val promise: Promise[Option[T]] = Promise()
+ flatMapOp({ data: T =>
+ data match {
+ case None =>
+ Some(promise.future.failed)
+ case Some(d) =>
+ promise.future.value
+ }
+ }, "headOption", conf)
+ }
+
+ def reduceOp[T](reduce: (T, T) => T, conf: UserConfig): Op = {
+ var result: Option[T] = None
+ val flatMap = { elem: T =>
+ result match {
+ case None =>
+ result = Some(elem)
+ case Some(r) =>
+ result = Some(reduce(r, elem))
+ }
+ List(result)
+ }
+ flatMapOp(flatMap, "reduce", conf)
+ }
+
+ def zipWithOp[In1, In2](zipWith: (In1, In2) => (In1, In2), conf: UserConfig): Op = {
+ val flatMap = { elem: (In1, In2) =>
+ val (e1, e2) = elem
+ val result: (In1, In2) = zipWith(e1, e2)
+ List(result)
+ }
+ flatMapOp(flatMap, "zipWith", conf)
+ }
+
+ def zipWithOp2[In1, In2, Out](zipWith: (In1, In2) => Out, conf: UserConfig): Op = {
+ val flatMap = { elem: (In1, In2) =>
+ val (e1, e2) = elem
+ val result: Out = zipWith(e1, e2)
+ List(result)
+ }
+ flatMapOp(flatMap, "zipWith", conf)
+ }
+
+ def identity(description: String, conf: UserConfig): Op = {
+ flatMapOp({ data: Any =>
+ List(data)
+ }, description, conf)
+ }
+
+ def mapOp[In, Out](map: In => Out, conf: UserConfig): Op = {
+ val flatMap = (data: In) => List(map(data))
+ flatMapOp (flatMap, conf)
+ }
+
+ def flatMapOp[In, Out](flatMap: In => Iterable[Out], conf: UserConfig): Op = {
+ flatMapOp(flatMap, "flatmap", conf)
+ }
+
+ def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
+ conf: UserConfig): Op = {
+ ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf)
+ }
+
+ def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
+ conf: UserConfig): Op = {
+ var agg = None: Option[Out]
+ val flatMap = {elem: In =>
+ agg = agg match {
+ case None =>
+ Some(seed(elem))
+ case Some(value) =>
+ Some(aggregate(value, elem))
+ }
+ List(agg.get)
+ }
+ flatMapOp (flatMap, "conflate", conf)
+ }
+
+ def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op = {
+ var aggregator: Out = zero
+ val map = { elem: In =>
+ aggregator = fold(aggregator, elem)
+ List(aggregator)
+ }
+ flatMapOp(map, "fold", conf)
+ }
+
+ def groupedOp(count: Int, conf: UserConfig): Op = {
+ var left = count
+ val buf = {
+ val b = Vector.newBuilder[Any]
+ b.sizeHint(count)
+ b
+ }
+
+ val flatMap: Any => Iterable[Any] = {input: Any =>
+ buf += input
+ left -= 1
+ if (left == 0) {
+ val emit = buf.result()
+ buf.clear()
+ left = count
+ Some(emit)
+ } else {
+ None
+ }
+ }
+ flatMapOp(flatMap, conf: UserConfig)
+ }
+
+ def dropOp[T](number: Long, conf: UserConfig): Op = {
+ var left = number
+ val flatMap: T => Iterable[T] = {input: T =>
+ if (left > 0) {
+ left -= 1
+ None
+ } else {
+ Some(input)
+ }
+ }
+ flatMapOp(flatMap, "drop", conf)
+ }
+
+ def dropWhileOp[In](drop: In => Boolean, conf: UserConfig): Op = {
+ flatMapOp({ data: In =>
+ if (drop(data)) None else Option(data)
+ }, "dropWhile", conf)
+ }
+
+ def logOp[T](name: String, extract: T => Any, conf: UserConfig): Op = {
+ val flatMap = {elem: T =>
+ LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
+ List(elem)
+ }
+ flatMapOp(flatMap, "log", conf)
+ }
+
+ def scanOp[In, Out](zero: Out, f: (Out, In) => Out, conf: UserConfig): Op = {
+ var aggregator = zero
+ var pushedZero = false
+
+ val flatMap = {elem: In =>
+ aggregator = f(aggregator, elem)
+
+ if (pushedZero) {
+ pushedZero = true
+ List(zero, aggregator)
+ } else {
+ List(aggregator)
+ }
+ }
+ flatMapOp(flatMap, "scan", conf)
+ }
+
+ def statefulMapOp[In, Out](f: In => Iterable[Out], conf: UserConfig): Op = {
+ flatMapOp ({ data: In =>
+ f(data)
+ }, conf)
+ }
+
+ def takeOp(count: Long, conf: UserConfig): Op = {
+ var left: Long = count
+
+ val filter: Any => Iterable[Any] = {elem: Any =>
+ left -= 1
+ if (left > 0) Some(elem)
+ else if (left == 0) Some(elem)
+ else None
+ }
+ flatMapOp(filter, "take", conf)
+ }
+
+ /**
+ * We use this attribute to track module to Processor
+ *
+ */
+ val TRACKABLE = "track how module is fused to processor"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
new file mode 100644
index 0000000..5b8c71b
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ *
+ * [[IN]] -> [[BridgeModule]] -> [[OUT]]
+ * /
+ * /
+ * out of band data input or output channel [[MAT]]
+ *
+ *
+ * [[BridgeModule]] is used as a bridge between different materializers.
+ * Different [[akka.stream.Materializer]]s can use out of band channel to
+ * exchange messages.
+ *
+ * For example:
+ *
+ * Remote Materializer
+ * -----------------------------
+ * | |
+ * | BridgeModule -> RemoteSink |
+ * | / |
+ * --/----------------------------
+ * Local Materializer / out of band channel.
+ * ----------------------/----
+ * | Local / |
+ * | Source -> BridgeModule |
+ * | |
+ * ---------------------------
+ *
+ *
+ * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
+ * module during materialization.
+ *
+ * However, user can still declare it explicitly. In this case, it means we have a
+ * boundary Source or Sink module which accept out of band channel inputs or
+ * outputs.
+ *
+ * @tparam IN input
+ * @tparam OUT output
+ * @tparam MAT materializer
+ */
+abstract class BridgeModule[IN, OUT, MAT] extends AtomicModule {
+ val inPort = Inlet[IN]("BridgeModule.in")
+ val outPort = Outlet[OUT]("BridgeModule.out")
+ override val shape = new FlowShape(inPort, outPort)
+
+ override def replaceShape(s: Shape): Module = if (s != shape) {
+ throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+ } else {
+ this
+ }
+
+ def attributes: Attributes
+ def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
+
+ protected def newInstance: BridgeModule[IN, OUT, MAT]
+ override def carbonCopy: Module = newInstance
+}
+
+
+/**
+ *
+ * Bridge module which accept out of band channel Input
+ * [[org.reactivestreams.Publisher]][IN].
+ *
+ *
+ * [[SourceBridgeModule]] -> [[OUT]]
+ * /|
+ * /
+ * out of band data input [[org.reactivestreams.Publisher]][IN]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]]
+ * @tparam OUT out put data type to next module.
+ */
+class SourceBridgeModule[IN, OUT](val attributes: Attributes =
+ Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] {
+ override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] =
+ new SourceBridgeModule[IN, OUT](attributes)
+
+ override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = {
+ new SourceBridgeModule( attributes)
+ }
+}
+
+/**
+ *
+ * Bridge module which accept out of band channel Output
+ * [[org.reactivestreams.Subscriber]][OUT].
+ *
+ *
+ * [[IN]] -> [[BridgeModule]]
+ * \
+ * \
+ * \|
+ * out of band data output [[org.reactivestreams.Subscriber]][OUT]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from previous module
+ * @tparam OUT out put data type to out of band subscriber
+ */
+class SinkBridgeModule[IN, OUT](val attributes: Attributes =
+ Attributes.name("sinkBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] {
+ override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] =
+ new SinkBridgeModule[IN, OUT](attributes)
+
+ override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = {
+ new SinkBridgeModule[IN, OUT](attributes)
+ }
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
similarity index 91%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
index bc744f9..ea76bb0 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
@@ -16,9 +16,9 @@
* limitations under the License.
*/
-package akka.stream.gearpump.module
+package org.apache.gearpump.akkastream.module
-import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
import akka.stream.impl.{SinkModule, SourceModule}
import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape}
import org.reactivestreams.{Publisher, Subscriber}
@@ -47,7 +47,8 @@
*
*
*/
-trait DummyModule extends Module
+trait DummyModule extends AtomicModule
+
/**
*
@@ -56,9 +57,9 @@
* /
* out of band input message Source
*
- * @param attributes
- * @param shape
- * @tparam Out
+ * @param attributes Attributes
+ * @param shape SourceShape[Out]
+ * @tparam Out Output
*/
class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out])
extends SourceModule[Out, Unit](shape) with DummyModule {
@@ -76,6 +77,7 @@
}
}
+
/**
*
* Source-> [[BridgeModule]] -> [[DummySink]]
@@ -84,8 +86,8 @@
* \|
* out of band output message [[Subscriber]]
*
- * @param attributes
- * @param shape
+ * @param attributes Attributes
+ * @param shape SinkShape[IN]
*/
class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
extends SinkModule[IN, Unit](shape) with DummyModule {
@@ -100,4 +102,4 @@
override def withAttributes(attr: Attributes): Module = {
new DummySink[IN](attr, amendShape(attr))
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
similarity index 67%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
index c4c78cc..dfbbee9 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
@@ -16,12 +16,10 @@
* limitations under the License.
*/
-package akka.stream.gearpump.module
+package org.apache.gearpump.akkastream.module
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape}
-
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import akka.stream._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
@@ -32,17 +30,17 @@
*
* This is specially designed for Gearpump runtime. It is not supposed to be used
* for local materializer.
- *
+ *
*/
-trait GearpumpTaskModule extends Module
+trait GearpumpTaskModule extends AtomicModule
/**
* This is used to represent the Gearpump Data Source
- * @param source
- * @param conf
- * @param shape
- * @param attributes
- * @tparam T
+ * @param source DataSource
+ * @param conf UserConfig
+ * @param shape SourceShape[T}
+ * @param attributes Attributes
+ * @tparam T type
*/
final case class SourceTaskModule[T](
source: DataSource,
@@ -51,13 +49,10 @@
attributes: Attributes = Attributes.name("SourceTaskModule"))
extends GearpumpTaskModule {
- override def subModules: Set[Module] = Set.empty
- override def withAttributes(attr: Attributes): Module = {
+ override def withAttributes(attr: Attributes): Module =
this.copy(shape = amendShape(attr), attributes = attr)
- }
- override def carbonCopy: Module = {
- this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out")))
- }
+ override def carbonCopy: Module =
+ this.copy(shape = SourceShape( Outlet[T]("SourceTaskModule.out")))
override def replaceShape(s: Shape): Module =
if (s == shape) this
@@ -68,17 +63,17 @@
val thatN = attr.nameOrDefault(null)
if ((thatN eq null) || thisN == thatN) shape
- else shape.copy(outlet = Outlet(thatN + ".out"))
+ else shape.copy(out = Outlet(thatN + ".out"))
}
}
/**
* This is used to represent the Gearpump Data Sink
- * @param sink
- * @param conf
- * @param shape
- * @param attributes
- * @tparam IN
+ * @param sink DataSink
+ * @param conf UserConfig
+ * @param shape SinkShape[IN]
+ * @param attributes Attributes
+ * @tparam IN type
*/
final case class SinkTaskModule[IN](
sink: DataSink,
@@ -87,11 +82,10 @@
attributes: Attributes = Attributes.name("SinkTaskModule"))
extends GearpumpTaskModule {
- override def subModules: Set[Module] = Set.empty
- override def withAttributes(attr: Attributes): Module = {
+ override def withAttributes(attr: Attributes): Module =
this.copy(shape = amendShape(attr), attributes = attr)
- }
- override def carbonCopy: Module = this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
+ override def carbonCopy: Module =
+ this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
override def replaceShape(s: Shape): Module =
if (s == shape) this
@@ -102,32 +96,40 @@
val thatN = attr.nameOrDefault(null)
if ((thatN eq null) || thisN == thatN) shape
- else shape.copy(inlet = Inlet(thatN + ".out"))
+ else shape.copy(in = Inlet(thatN + ".out"))
}
}
/**
* This is to represent the Gearpump Processor which has exact one input and one output
- * @param processor
- * @param conf
- * @param attributes
- * @tparam IN
- * @tparam OUT
- * @tparam Unit
+ * @param processor Class[_ <: Task]
+ * @param conf UserConfig
+ * @param attributes Attributes
+ * @tparam IN type
+ * @tparam OUT type
+ * @tparam Unit void
*/
case class ProcessorModule[IN, OUT, Unit](
processor: Class[_ <: Task],
conf: UserConfig,
- val attributes: Attributes = Attributes.name("processorModule"))
- extends FlowModule[IN, OUT, Unit] with GearpumpTaskModule {
+ attributes: Attributes = Attributes.name("processorModule"))
+ extends AtomicModule with GearpumpTaskModule {
+ val inPort = Inlet[IN]("ProcessorModule.in")
+ val outPort = Outlet[IN]("ProcessorModule.out")
+ override val shape = new FlowShape(inPort, outPort)
+ override def replaceShape(s: Shape): Module = if (s != shape) {
+ throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+ } else {
+ this
+ }
+
override def carbonCopy: Module = newInstance
- protected def newInstance: ProcessorModule[IN, OUT, Unit] = {
+ protected def newInstance: ProcessorModule[IN, OUT, Unit] =
new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
- }
override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = {
new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
new file mode 100644
index 0000000..b06dd0e
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
+
+/**
+ *
+ * Group the T value groupBy function
+ *
+ * @param groupBy T => Group
+ * @param attributes Attributes
+ * @tparam T type
+ * @tparam Group type
+ */
+case class GroupByModule[T, Group](groupBy: T => Group,
+ attributes: Attributes = Attributes.name("groupByModule"))
+ extends AtomicModule {
+ val inPort = Inlet[T]("GroupByModule.in")
+ val outPort = Outlet[T]("GroupByModule.out")
+ override val shape = new FlowShape(inPort, outPort)
+
+ override def replaceShape(s: Shape): Module = if (s != shape) {
+ throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+ } else {
+ this
+ }
+
+ override def carbonCopy: Module = newInstance
+
+ protected def newInstance: GroupByModule[T, Group] =
+ new GroupByModule[T, Group](groupBy, attributes)
+
+ override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = {
+ new GroupByModule[T, Group](groupBy, attributes)
+ }
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
similarity index 62%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
index 926feb6..462d967 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
@@ -16,23 +16,31 @@
* limitations under the License.
*/
-package akka.stream.gearpump.module
+package org.apache.gearpump.akkastream.module
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
/**
*
* Reduce Module
*
- * @param f
- * @param attributes
- * @tparam T
+ * @param f (T,T) => T
+ * @param attributes Attributes
+ * @tparam T type
*/
-case class ReduceModule[T](
- val f: (T, T) => T, val attributes: Attributes = Attributes.name("reduceModule"))
- extends FlowModule[T, T, Unit] {
+case class ReduceModule[T](f: (T, T) => T, attributes: Attributes =
+Attributes.name("reduceModule")) extends AtomicModule {
+ val inPort = Inlet[T]("GroupByModule.in")
+ val outPort = Outlet[T]("GroupByModule.out")
+ override val shape = new FlowShape(inPort, outPort)
+
+ override def replaceShape(s: Shape): Module = if (s != shape) {
+ throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
+ } else {
+ this
+ }
override def carbonCopy: Module = newInstance
@@ -41,4 +49,4 @@
override def withAttributes(attributes: Attributes): ReduceModule[T] = {
new ReduceModule[T](f, attributes)
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
similarity index 90%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
index 9cc46c9..8e43c16 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
@@ -16,19 +16,19 @@
* limitations under the License.
*/
-package akka.stream.gearpump.scaladsl
+package org.apache.gearpump.akkastream.scaladsl
import akka.stream.Attributes
-import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
+import org.apache.gearpump.akkastream.module._
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
-import org.reactivestreams.{Publisher, Subscriber}
-
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.Task
+import org.reactivestreams.{Publisher, Subscriber}
-object GearSource {
+
+object GearSource{
/**
* Construct a Source which accepts out of band input messages.
@@ -52,18 +52,18 @@
/**
* Construct a Source from Gearpump [[DataSource]].
*
- * [[SourceTaskModule]] -> downstream Sink
+ * [[SourceTaskModule]] -> downstream Sink
*
*/
def from[OUT](source: DataSource): Source[OUT, Unit] = {
- val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty))
+ val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty))
taskSource
}
/**
* Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]].
*
- * [[ProcessorModule]] -> downstream Sink
+ * [[ProcessorModule]] -> downstream Sink
*
*/
def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = {
@@ -97,7 +97,7 @@
/**
* Construct a Sink from Gearpump [[DataSink]].
*
- * Upstream Source -> [[SinkTaskModule]]
+ * Upstream Source -> [[SinkTaskModule]]
*
*/
def to[IN](sink: DataSink): Sink[IN, Unit] = {
@@ -108,7 +108,7 @@
/**
* Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]].
*
- * Upstream Source -> [[ProcessorModule]]
+ * Upstream Source -> [[ProcessorModule]]
*
*/
def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
@@ -131,7 +131,7 @@
*
* val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
* Count(kv.key, 1)
- * }.fold(Count(null, 0)){(base, add) =>
+ * }.fold(Count(null, 0)) {(base, add) =>
* Count(add.key, base.count + add.count)
* }.log("count of current key")
* .flatten()
@@ -146,7 +146,7 @@
* sink will only operate on the main stream.
*
*/
-object GroupBy {
+object GroupBy{
def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
new Flow[T, T, Unit](new GroupByModule(groupBy))
}
@@ -159,18 +159,19 @@
*
*
*/
-object Reduce {
+object Reduce{
def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
new Flow[T, T, Unit](new ReduceModule(reduce))
}
}
+
/**
* Create a Flow by providing a Gearpump Processor class and configuration
*
*
*/
-object Processor {
+object Processor{
def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = {
new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf))
}
@@ -183,7 +184,7 @@
*/
implicit class SourceOps[T, Mat](source: Source[T, Mat]) {
- //TODO It is named as groupBy2 to avoid conflict with built-in
+ // TODO It is named as groupBy2 to avoid conflict with built-in
// groupBy. Eventually, we think the built-in groupBy should
// be replace with this implementation.
def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
@@ -191,6 +192,7 @@
source.via[T, Unit](stage)
}
+
def reduce(reduce: (T, T) => T): Source[T, Mat] = {
val stage = Reduce.apply(reduce)
source.via[T, Unit](stage)
@@ -237,10 +239,13 @@
}
/**
- * Does sum on values
+ * do sum on values
*
* Before doing this, you need to do groupByKey to group same key together
* , otherwise, it will do the sum no matter what current key is.
+ *
+ * @param numeric Numeric[V]
+ * @return
*/
def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
val stage = Reduce.apply(sumByKey[K, V](numeric))
@@ -249,13 +254,13 @@
}
/**
- * Helper util to support groupByKey and sum
+ * Help util to support groupByKey and sum
*/
implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
/**
- * If it is a KV Pair, we can group the KV pair by the key.
- *
+ * if it is a KV Pair, we can group the KV pair by the key.
+ * @return
*/
def groupByKey: Flow[(K, V), (K, V), Mat] = {
val stage = GroupBy.apply(getTupleKey[K, V])
@@ -268,6 +273,8 @@
* Before doing this, you need to do groupByKey to group same key together
* , otherwise, it will do the sum no matter what current key is.
*
+ * @param numeric Numeric[V]
+ * @return
*/
def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
val stage = Reduce.apply(sumByKey[K, V](numeric))
@@ -279,4 +286,4 @@
private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] =
(tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
similarity index 84%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
index 2eb0612..43f07c4 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
@@ -16,22 +16,23 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class BalanceTask(context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
val sizeOfOutputs = sizeOfOutPorts
var index = 0
- override def onNext(msg: Message): Unit = {
+ override def onNext(msg : Message) : Unit = {
output(index, msg)
index += 1
if (index == sizeOfOutputs) {
index = 0
}
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
new file mode 100644
index 0000000..5c2485b
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val max = userConf.getLong(BatchTask.MAX)
+ val costFunc = userConf.getValue[In => Long](BatchTask.COST)
+ val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE)
+ val seed = userConf.getValue[In => Out](BatchTask.SEED)
+
+ override def onNext(msg : Message) : Unit = {
+ val data = msg.msg.asInstanceOf[In]
+ val time = msg.timestamp
+ context.output(msg)
+ }
+}
+
+object BatchTask {
+ val AGGREGATE = "AGGREGATE"
+ val COST = "COST"
+ val MAX = "MAX"
+ val SEED = "SEED"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
similarity index 82%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
index 925bf21..292468d 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
@@ -16,14 +16,15 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
- override def onNext(msg: Message): Unit = {
+class BroadcastTask(context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+ override def onNext(msg : Message) : Unit = {
context.output(msg)
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
similarity index 84%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
index 2eb0612..b77b9bd 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
@@ -16,22 +16,23 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class ConcatTask(context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
val sizeOfOutputs = sizeOfOutPorts
var index = 0
- override def onNext(msg: Message): Unit = {
+ override def onNext(msg : Message) : Unit = {
output(index, msg)
index += 1
if (index == sizeOfOutputs) {
index = 0
}
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
new file mode 100644
index 0000000..7c335dc
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+case object DelayInitialTime
+
+class DelayInitialTask[T](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val delayInitial = userConf.getValue[FiniteDuration](DelayInitialTask.DELAY_INITIAL).
+ getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+ var delayInitialActive = true
+
+ override def onStart(startTime: Instant): Unit = {
+ context.scheduleOnce(delayInitial)(
+ self ! Message(DelayInitialTime, System.currentTimeMillis())
+ )
+ }
+ override def onNext(msg : Message) : Unit = {
+ msg.msg match {
+ case DelayInitialTime =>
+ delayInitialActive = false
+ case _ =>
+ delayInitialActive match {
+ case true =>
+ case false =>
+ context.output(msg)
+ }
+ }
+ }
+}
+
+object DelayInitialTask {
+ val DELAY_INITIAL = "DELAY_INITIAL"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
new file mode 100644
index 0000000..0c54829
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object DropWithinTimeout
+
+class DropWithinTask[T](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val timeout = userConf.getValue[FiniteDuration](DropWithinTask.TIMEOUT).
+ getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+ var timeoutActive = true
+
+ override def onStart(startTime: Instant): Unit = {
+ context.scheduleOnce(timeout)(
+ self ! Message(DropWithinTimeout, System.currentTimeMillis())
+ )
+ }
+
+ override def onNext(msg : Message) : Unit = {
+ msg.msg match {
+ case DropWithinTimeout =>
+ timeoutActive = false
+ case _ =>
+
+ }
+ timeoutActive match {
+ case true =>
+ case false =>
+ context.output(msg)
+ }
+ }
+}
+
+object DropWithinTask {
+ val TIMEOUT = "TIMEOUT"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
similarity index 84%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
index 2eb0612..14ff537 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
@@ -16,22 +16,23 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class FlattenMergeTask(context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
val sizeOfOutputs = sizeOfOutPorts
var index = 0
- override def onNext(msg: Message): Unit = {
+ override def onNext(msg : Message) : Unit = {
output(index, msg)
index += 1
if (index == sizeOfOutputs) {
index = 0
}
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
new file mode 100644
index 0000000..d982ebd
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class FoldTask[In, Out](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val zero = userConf.getValue[Out](FoldTask.ZERO)
+ val aggregator = userConf.getValue[(Out, In) => Out](FoldTask.AGGREGATOR)
+ var aggregated: Out = _
+ implicit val ec = context.system.dispatcher
+
+ override def onStart(instant: Instant): Unit = {
+ zero.foreach(value => {
+ aggregated = value
+ })
+ }
+
+ override def onNext(msg : Message) : Unit = {
+ val data = msg.msg.asInstanceOf[In]
+ val time = msg.timestamp
+ aggregator.foreach(func => {
+ aggregated = func(aggregated, data)
+ LOG.info(s"aggregated = $aggregated")
+ val msg = new Message(aggregated, time)
+ context.output(msg)
+ })
+ }
+}
+
+object FoldTask {
+ val ZERO = "ZERO"
+ val AGGREGATOR = "AGGREGATOR"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
similarity index 65%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
index 9a4e24e..3310ab9 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
@@ -16,35 +16,36 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
-import akka.stream.gearpump.task.GraphTask.{Index, PortId}
+import java.time.Instant
import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.GraphTask.{Index, PortId}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskWrapper}
-class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig)
+class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig)
extends Task(inputTaskContext, userConf) {
private val context = inputTaskContext.asInstanceOf[TaskWrapper]
- private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]](
- GraphTask.OUT_PROCESSORS).get)
- private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]](
- GraphTask.IN_PROCESSORS).get)
+ protected val outMapping =
+ portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get)
+ protected val inMapping =
+ portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get)
val sizeOfOutPorts = outMapping.keys.size
val sizeOfInPorts = inMapping.keys.size
-
+
private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = {
- val portToProcessor = processors.zipWithIndex.map { kv =>
+ val portToProcessor = processors.zipWithIndex.map{kv =>
(kv._2, kv._1)
}.toMap
val processorToIndex = processors.sorted.zipWithIndex.toMap
- val portToIndex = portToProcessor.map { kv =>
+ val portToIndex = portToProcessor.map{kv =>
val (outlet, processorId) = kv
val index = processorToIndex(processorId)
(outlet, index)
@@ -56,15 +57,15 @@
context.output(outMapping(outletId), msg)
}
- override def onStart(startTime: StartTime): Unit = {}
+ override def onStart(startTime : Instant) : Unit = {}
- override def onStop(): Unit = {}
+ override def onStop() : Unit = {}
}
object GraphTask {
- val OUT_PROCESSORS = "akka.stream.gearpump.task.outprocessors"
- val IN_PROCESSORS = "akka.stream.gearpump.task.inprocessors"
+ val OUT_PROCESSORS = "org.apache.gearpump.akkastream.task.outprocessors"
+ val IN_PROCESSORS = "org.apache.gearpump.akkastream.task.inprocessors"
type PortId = Int
type Index = Int
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
new file mode 100644
index 0000000..eaf2b3f
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.collection.immutable.VectorBuilder
+import scala.concurrent.duration.FiniteDuration
+
+class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ case object GroupedWithinTrigger
+ val buf: VectorBuilder[T] = new VectorBuilder
+ val timeWindow = userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW)
+ val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE)
+
+ override def onNext(msg : Message) : Unit = {
+
+ }
+}
+
+object GroupedWithinTask {
+ val BATCH_SIZE = "BATCH_SIZE"
+ val TIME_WINDOW = "TIME_WINDOW"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
similarity index 71%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
index 2eb0612..741ec43 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
@@ -16,22 +16,29 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class InterleaveTask(context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
- val sizeOfOutputs = sizeOfOutPorts
+ val sizeOfInputs = sizeOfInPorts
var index = 0
- override def onNext(msg: Message): Unit = {
+ // TODO access upstream and pull
+ override def onNext(msg : Message) : Unit = {
output(index, msg)
index += 1
- if (index == sizeOfOutputs) {
+ if (index == sizeOfInputs) {
index = 0
}
}
-}
\ No newline at end of file
+}
+
+object InterleaveTask {
+ val INPUT_PORTS = "INPUT_PORTS"
+ val SEGMENT_SIZE = "SEGMENT_SIZE"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
new file mode 100644
index 0000000..daa1afc
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+
+class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC)
+ implicit val ec = context.system.dispatcher
+
+ override def onNext(msg : Message) : Unit = {
+ val data = msg.msg.asInstanceOf[In]
+ val time = msg.timestamp
+ f match {
+ case Some(func) =>
+ val fout = func(data)
+ fout.onComplete(value => {
+ value.foreach(out => {
+ val msg = new Message(out, time)
+ context.output(msg)
+ })
+ })
+ case None =>
+ }
+ }
+}
+
+object MapAsyncTask {
+ val MAPASYNC_FUNC = "MAPASYNC_FUNC"
+
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
similarity index 69%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
index 925bf21..ad18f72 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
@@ -16,14 +16,24 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
- override def onNext(msg: Message): Unit = {
+class MergeTask(context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE)
+ val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS)
+
+ override def onNext(msg : Message) : Unit = {
context.output(msg)
}
-}
\ No newline at end of file
+}
+
+object MergeTask {
+ val EAGER_COMPLETE = "EAGER_COMPLETE"
+ val INPUT_PORTS = "INPUT_PORTS"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
similarity index 65%
copy from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
copy to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
index 2eb0612..458bb4e 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -16,22 +16,28 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+import scala.concurrent.duration.FiniteDuration
- val sizeOfOutputs = sizeOfOutPorts
- var index = 0
+class SingleSourceTask[T](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
- override def onNext(msg: Message): Unit = {
- output(index, msg)
- index += 1
- if (index == sizeOfOutputs) {
- index = 0
- }
+ val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get
+
+ override def onNext(msg : Message) : Unit = {
+ context.output(Message(elem, msg.timestamp))
}
-}
\ No newline at end of file
+}
+
+object SingleSourceTask {
+ val ELEMENT = "ELEMENT"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
similarity index 78%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
index b681852..1b9c4e3 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
@@ -16,24 +16,24 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
+import java.time.Instant
import java.util
import java.util.concurrent.TimeUnit
import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage
import akka.util.Timeout
-import org.reactivestreams.{Publisher, Subscriber, Subscription}
-
import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.RequestMessage
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
import org.apache.gearpump.util.LogUtil
+import org.reactivestreams.{Publisher, Subscriber, Subscription}
/**
* Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module
@@ -46,23 +46,27 @@
* \|
* Akka Stream [[Subscriber]]
*
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
*/
-class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
+class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+ extends Task(taskContext, userConf) {
import taskContext.taskId
val queue = new util.LinkedList[Message]()
- var subscriber: ActorRef = null
+ var subscriber: ActorRef = _
var request: Int = 0
- override def onStart(startTime: StartTime): Unit = {}
+ override def onStart(startTime : Instant) : Unit = {}
- override def onNext(msg: Message): Unit = {
+ override def onNext(msg : Message) : Unit = {
queue.add(msg)
trySendingData()
}
- override def onStop(): Unit = {}
+ override def onStop() : Unit = {}
private def trySendingData(): Unit = {
if (subscriber != null) {
@@ -88,15 +92,16 @@
case class RequestMessage(number: Int)
- class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int, processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
+ class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int,
+ processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
private val taskId = TaskId(processorId, index = 0)
-
private val LOG = LogUtil.getLogger(getClass)
- private var actor: ActorRef = null
+ private var actor: ActorRef = _
import system.dispatcher
- private val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
+ private val task =
+ context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map{container =>
// println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
container.task
}
@@ -111,7 +116,7 @@
private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
override def request(l: Long): Unit = {
- task.foreach { task =>
+ task.foreach{ task =>
task.tell(RequestMessage(l.toInt), actor)
}
}
@@ -119,7 +124,8 @@
class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor {
def receive: Receive = {
- case result: AnyRef => subscriber.onNext(result)
+ case result: AnyRef =>
+ subscriber.onNext(result)
}
}
-}
\ No newline at end of file
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
similarity index 71%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
index ccbd350..054b483 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -16,20 +16,21 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
-import scala.concurrent.ExecutionContext
+import java.time.Instant
import akka.actor.Actor.Receive
-import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error}
-import org.reactivestreams.{Subscriber, Subscription}
-
import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
+import org.reactivestreams.{Subscriber, Subscription}
+
+import scala.concurrent.ExecutionContext
/**
* Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task
@@ -42,44 +43,51 @@
* / Local JVM
* Akka Stream [[org.reactivestreams.Publisher]]
*
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
*/
-class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
+class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+ extends Task(taskContext, userConf) {
import taskContext.taskId
- override def onStart(startTime: StartTime): Unit = {}
+ override def onStart(startTime : Instant) : Unit = {}
- override def onNext(msg: Message): Unit = {
+ override def onNext(msg : Message) : Unit = {
LOG.info("AkkaStreamSource receiving message " + msg)
}
- override def onStop(): Unit = {}
+ override def onStop() : Unit = {}
override def receiveUnManagedMessage: Receive = {
case Error(ex) =>
LOG.error("the stream has error", ex)
case AkkaStreamMessage(msg) =>
- LOG.error("we have received message from akka stream source: " + msg)
+ LOG.info("we have received message from akka stream source: " + msg)
taskContext.output(Message(msg, System.currentTimeMillis()))
case Complete(description) =>
- LOG.error("the stream is completed: " + description)
+ LOG.info("the stream is completed: " + description)
case msg =>
LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
}
}
+
object SourceBridgeTask {
case class Error(ex: java.lang.Throwable)
case class Complete(description: String)
- case class AkkaStreamMessage(msg: AnyRef)
+ case class AkkaStreamMessage[T >: AnyRef](msg: T)
- class SourceBridgeTaskClient[T <: AnyRef](ec: ExecutionContext, context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] {
+ class SourceBridgeTaskClient[T >: AnyRef](ec: ExecutionContext,
+ context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] {
val taskId = TaskId(processorId, 0)
- var subscription: Subscription = null
+ var subscription: Subscription = _
implicit val dispatcher = ec
- val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
+ val task = context.askAppMaster[TaskActorRef](appId,
+ LookupTaskActorRef(taskId)).map{container =>
// println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
container.task
}
@@ -89,6 +97,7 @@
}
override def onSubscribe(subscription: Subscription): Unit = {
+ // when taskActorRef is resolved, request message from upstream
this.subscription = subscription
task.map(task => subscription.request(1))
}
@@ -98,7 +107,7 @@
}
override def onNext(t: T): Unit = {
- task.map { task =>
+ task.map {task =>
task ! AkkaStreamMessage(t)
}
subscription.request(1)
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
new file mode 100644
index 0000000..a0674bc
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val func = userConf.getValue[() => IN => Iterable[OUT]](StatefulMapConcatTask.FUNC).get
+ var f: IN => Iterable[OUT] = _
+
+ override def onStart(startTime: Instant) : Unit = {
+ f = func()
+ }
+
+ override def onNext(msg : Message) : Unit = {
+ val in: IN = msg.msg.asInstanceOf[IN]
+ val out: Iterable[OUT] = f(in)
+ val iterator = out.iterator
+ while(iterator.hasNext) {
+ val nextValue = iterator.next
+ context.output(Message(nextValue, System.currentTimeMillis()))
+ }
+ }
+}
+
+object StatefulMapConcatTask {
+ val FUNC = "FUNC"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
new file mode 100644
index 0000000..9559d8f
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object TakeWithinTimeout
+
+class TakeWithinTask[T](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val timeout = userConf.getValue[FiniteDuration](TakeWithinTask.TIMEOUT).
+ getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+ var timeoutActive = false
+
+ override def onStart(startTime: Instant): Unit = {
+ context.scheduleOnce(timeout)(
+ self ! Message(DropWithinTimeout, System.currentTimeMillis())
+ )
+ }
+
+ override def onNext(msg : Message) : Unit = {
+ msg.msg match {
+ case DropWithinTimeout =>
+ timeoutActive = true
+ case _ =>
+
+ }
+ timeoutActive match {
+ case true =>
+ case false =>
+ context.output(msg)
+ }
+ }
+}
+
+object TakeWithinTask {
+ val TIMEOUT = "TIMEOUT"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
new file mode 100644
index 0000000..3c7ad87
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val cost = userConf.getInt(ThrottleTask.COST).getOrElse(0)
+ val costCalc = userConf.getValue[T => Int](ThrottleTask.COST_CALC)
+ val maxBurst = userConf.getInt(ThrottleTask.MAX_BURST)
+ val timePeriod = userConf.getValue[FiniteDuration](ThrottleTask.TIME_PERIOD).
+ getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+ val interval = timePeriod.toNanos / cost
+
+ // TODO control rate from TaskActor
+ override def onNext(msg : Message) : Unit = {
+ val data = msg.msg.asInstanceOf[T]
+ val time = msg.timestamp
+ context.output(msg)
+ }
+}
+
+object ThrottleTask {
+ val COST = "COST"
+ val COST_CALC = "COST_CAL"
+ val MAX_BURST = "MAX_BURST"
+ val TIME_PERIOD = "TIME_PERIOD"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
new file mode 100644
index 0000000..d99d2db
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val initialDelay = userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY).
+ getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+ (TickSourceTask.INITIAL_DELAY)
+ val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL).
+ getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+ val tick = userConf.getValue[T](TickSourceTask.TICK).get
+
+ override def onStart(startTime: Instant): Unit = {
+ context.schedule(initialDelay, interval)(
+ self ! Message(tick, System.currentTimeMillis())
+ )
+ }
+
+ override def onNext(msg : Message) : Unit = {
+ context.output(msg)
+ }
+}
+
+object TickSourceTask {
+ val INITIAL_DELAY = "INITIAL_DELAY"
+ val INTERVAL = "INTERVAL"
+ val TICK = "TICK"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
similarity index 64%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
index 78fabbe..005d018 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -16,30 +16,31 @@
* limitations under the License.
*/
-package akka.stream.gearpump.task
+package org.apache.gearpump.akkastream.task
-import akka.stream.gearpump.task.UnZip2Task.UnZipFunction
-
+import org.apache.gearpump.akkastream.task.Unzip2Task.UnZipFunction
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.task.TaskContext
-class UnZip2Task(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
+class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
- val unzip = userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
+ val unzip = userConf.
+ getValue[UnZipFunction[In, A1, A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
- override def onNext(msg: Message): Unit = {
+ override def onNext(msg : Message) : Unit = {
val message = msg.msg
val time = msg.timestamp
- val pair = unzip(message)
+ val pair = unzip(message.asInstanceOf[In])
val (a, b) = pair
output(0, Message(a.asInstanceOf[AnyRef], time))
output(1, Message(b.asInstanceOf[AnyRef], time))
}
}
-object UnZip2Task {
- class UnZipFunction(val unzip: Any => (Any, Any)) extends Serializable
+object Unzip2Task {
+ case class UnZipFunction[In, A1, A2](unzip: In => (A1, A2)) extends Serializable
- val UNZIP2_FUNCTION = "akka.stream.gearpump.task.unzip2.function"
-}
\ No newline at end of file
+ val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function"
+}
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
new file mode 100644
index 0000000..7e0c082
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.Zip2Task.ZipFunction
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig)
+ extends GraphTask(context, userConf) {
+
+ val zip = userConf.
+ getValue[ZipFunction[A1, A2, OUT]](Zip2Task.ZIP2_FUNCTION)(context.system).get.zip
+ var a1: Option[A1] = None
+ var a2: Option[A2] = None
+
+ override def onNext(msg : Message) : Unit = {
+ val message = msg.msg
+ val time = msg.timestamp
+ a1 match {
+ case Some(x) =>
+ a2 = Some(message.asInstanceOf[A2])
+ a1.foreach(v1 => {
+ a2.foreach(v2 => {
+ val out = zip(v1, v2)
+ context.output(Message(out.asInstanceOf[OUT], time))
+
+ })
+ })
+ case None =>
+ a1 = Some(message.asInstanceOf[A1])
+ }
+ }
+}
+
+object Zip2Task {
+ case class ZipFunction[A1, A2, OUT](val zip: (A1, A2) => OUT) extends Serializable
+
+ val ZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.zip2.function"
+}
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
similarity index 78%
rename from experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
rename to experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
index c774fc7..6ad90df 100644
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
@@ -16,15 +16,17 @@
* limitations under the License.
*/
-package akka.stream.gearpump.util
+package org.apache.gearpump.akkastream.util
import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, MaterializedValueNode, Module, Transform}
class MaterializedValueOps(mat: MaterializedValueNode) {
- def resolve[Mat](materializedValues: Map[Module, Any]): Mat = {
- def resolveMaterialized(mat: MaterializedValueNode, materializedValues: Map[Module, Any]): Any = mat match {
+ def resolve[Mat](materializedValues: scala.collection.mutable.Map[Module, Any]): Mat = {
+ def resolveMaterialized(mat: MaterializedValueNode,
+ materializedValues: scala.collection.mutable.Map[Module, Any]): Any = mat match {
case Atomic(m) => materializedValues.getOrElse(m, ())
- case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
+ case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+ resolveMaterialized(d2, materializedValues))
case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
case Ignore => ()
}
@@ -32,7 +34,7 @@
}
}
-object MaterializedValueOps {
+object MaterializedValueOps{
def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat)
}
diff --git a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
similarity index 96%
rename from experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
rename to experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
index 4ead839..e1846ea 100644
--- a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
+++ b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package akka.stream.gearpump
+package org.apache.gearpump.akkastream
import akka.stream.Attributes
import org.scalatest.{FlatSpec, Matchers}
@@ -30,4 +30,5 @@
assert("aa-bb" == c.nameOrDefault())
}
+
}
diff --git a/project/BuildDashboard.scala b/project/BuildDashboard.scala
index c14b9d6..cfa6aae 100644
--- a/project/BuildDashboard.scala
+++ b/project/BuildDashboard.scala
@@ -46,11 +46,11 @@
private lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test",
+ "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.lihaoyi" %% "upickle" % upickleVersion,
- "com.softwaremill.akka-http-session" %% "core" % "0.2.5",
- "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
+ "com.softwaremill.akka-http-session" %% "core" % "0.3.0",
+ "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
"com.github.scribejava" % "scribejava-apis" % "2.4.0",
"com.ning" % "async-http-client" % "1.9.33",
"org.webjars" % "angularjs" % "1.4.9",
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index e07b688..eb5f9e1 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -25,7 +25,7 @@
object BuildExperiments extends sbt.Build {
lazy val experiments: Seq[ProjectReference] = Seq(
- // akkastream,
+ akkastream,
cgroup,
redis,
storm,
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 6949497..4e30d3f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -23,7 +23,8 @@
val crossScalaVersionNumbers = Seq("2.11.8")
val scalaVersionNumber = crossScalaVersionNumbers.last
- val akkaVersion = "2.4.3"
+ val akkaVersion = "2.4.16"
+ val akkaHttpVersion = "10.0.1"
val hadoopVersion = "2.6.0"
val hbaseVersion = "1.0.0"
val commonsHttpVersion = "3.1"
@@ -82,10 +83,9 @@
"com.typesafe.akka" %% "akka-agent" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-kernel" % akkaVersion,
- "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
- "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
+ "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
+ "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
"org.scala-lang" % "scala-reflect" % scalaVersionNumber,
- "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
"com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion,
"com.google.guava" % "guava" % guavaVersion,
"com.codahale.metrics" % "metrics-graphite" % codahaleVersion
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
index 3088a39..53ee692 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -149,7 +149,7 @@
}
}
} ~
- path("metrics" / RestPath) { path =>
+ path("metrics" / RemainingPath) { path =>
parameterMap { optionMap =>
parameter("aggregator" ? "") { aggregator =>
parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index ed15121..be96577 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -102,7 +102,7 @@
failWith(ex)
}
} ~
- path("metrics" / RestPath) { path =>
+ path("metrics" / RemainingPath) { path =>
parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String =>
val query = QueryHistoryMetrics(path.head.toString, readOption)
onComplete(askActor[HistoryMetrics](master, query)) {
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
index 804b34f..4989364 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
@@ -60,8 +60,8 @@
class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService {
// Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box.
- private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump",
- params = Map.empty)
+ private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = Some("gearpump"),
+ params = Map.empty[String, String])
val LOG = LogUtil.getLogger(getClass, "AUDIT")
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
index 284d3f2..7b33987 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -19,10 +19,12 @@
package org.apache.gearpump.services
import akka.actor.ActorSystem
+import akka.http.scaladsl.marshalling.ToResponseMarshallable
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.marshalling.ToResponseMarshallable._
+import akka.http.scaladsl.server.{RejectionHandler, StandardRoute}
import akka.stream.Materializer
-
import org.apache.gearpump.util.Util
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
@@ -56,14 +58,14 @@
getFromResource("index.html")
} ~
path("favicon.ico") {
- complete(StatusCodes.NotFound)
+ complete(ToResponseMarshallable(StatusCodes.NotFound))
} ~
pathPrefix("webjars") {
get {
getFromResourceDirectory("META-INF/resources/webjars")
}
} ~
- path(Rest) { path =>
+ path(Remaining) { path =>
getFromResource("%s" format path)
}
}
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
index 8268d61..954fe97 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
@@ -63,7 +63,7 @@
failWith(ex)
}
} ~
- path("metrics" / RestPath ) { path =>
+ path("metrics" / RemainingPath ) { path =>
val workerId = WorkerId.parse(workerIdString)
parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
val query = QueryHistoryMetrics(path.head.toString, readOption)
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index ca8d89e..d4b3719 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@
*/
class StreamApplication(
override val name: String, val inputUserConfig: UserConfig,
- dag: Graph[ProcessorDescription, PartitionerDescription])
+ val dag: Graph[ProcessorDescription, PartitionerDescription])
extends Application {
require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 82ea7c7..5aaf2fa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -124,12 +124,11 @@
* to another Op to be used
*/
case class ChainableOp[IN, OUT](
- fn: SingleInputFunction[IN, OUT]) extends Op {
+ fn: SingleInputFunction[IN, OUT],
+ userConfig: UserConfig = UserConfig.empty) extends Op {
override def description: String = fn.description
- override def userConfig: UserConfig = UserConfig.empty
-
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
case op: ChainableOp[OUT, _] =>