blob: 07c95f83a640021c07d55b6346a75e523d72f6a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.akkastream
import java.util.concurrent.atomic.AtomicBoolean
import akka.NotUsed
import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem}
import akka.event.{Logging, LoggingAdapter}
import akka.stream.Attributes.Attribute
import akka.stream.impl.Stages.SymbolicGraphStage
import akka.stream.impl.StreamLayout.{Atomic, Combine, CopiedModule, Ignore, MaterializedValueNode, Module, Transform}
import akka.stream.{ActorAttributes, ActorMaterializerSettings, Attributes, ClosedShape, Fusing, Graph, InPort, OutPort, SinkShape}
import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule}
import akka.stream.impl.{ExtendedActorMaterializer, StreamSupervisor}
import akka.stream.stage.GraphStage
import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer
import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer
import org.apache.gearpump.akkastream.graph._
import org.apache.gearpump.util.{Graph => GGraph}
import scala.collection.mutable
import scala.concurrent.{ExecutionContextExecutor, Promise}
import scala.concurrent.duration.FiniteDuration
object GearpumpMaterializer {
final case class Edge(from: OutPort, to: InPort)
final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool)
def apply(strategy: Strategy)(implicit context: ActorRefFactory): ExtendedActorMaterializer = {
val system = actorSystemOf(context)
apply(ActorMaterializerSettings(
system).withAutoFusing(false), strategy, useLocalCluster = false, "flow")(context)
}
def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
useLocalCluster: Boolean = true,
namePrefix: Option[String] = None)(implicit context: ActorRefFactory):
ExtendedActorMaterializer = {
val system = actorSystemOf(context)
val settings = materializerSettings getOrElse
ActorMaterializerSettings(system).withAutoFusing(false)
apply(settings, strategy, useLocalCluster, namePrefix.getOrElse("flow"))(context)
}
def apply(materializerSettings: ActorMaterializerSettings,
strategy: Strategy,
useLocalCluster: Boolean,
namePrefix: String)(implicit context: ActorRefFactory):
ExtendedActorMaterializer = {
val system = actorSystemOf(context)
new GearpumpMaterializer(
system,
materializerSettings,
context.actorOf(
StreamSupervisor.props(materializerSettings, false).withDispatcher(
materializerSettings.dispatcher), StreamSupervisor.nextName()))
}
private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
val system = context match {
case s: ExtendedActorSystem => s
case c: ActorContext => c.system
case null => throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _ =>
throw new IllegalArgumentException(
s"""
| context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]
""".stripMargin
)
}
system
}
}
/**
*
* [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
* streaming application. If some module cannot be rendered remotely in Gearpump
* Cluster, then it will use local Actor materializer as fallback to materialize
* the module locally.
*
* User can customize a [[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]]
* to determine which module should be rendered
* remotely, and which module should be rendered locally.
*
* @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]]
* to find out how we cut the runnableGraph to two parts,
* and materialize them separately.
* @param system ActorSystem
* @param strategy Strategy
* @param useLocalCluster whether to use built-in in-process local cluster
*/
class GearpumpMaterializer(override val system: ActorSystem,
override val settings: ActorMaterializerSettings,
override val supervisor: ActorRef,
strategy: Strategy = GraphPartitioner.AllRemoteStrategy,
useLocalCluster: Boolean = true, namePrefix: Option[String] = None)
extends ExtendedActorMaterializer {
private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
classOf[LocalGraph] -> new LocalGraphMaterializer(system),
classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system)
)
override def logger: LoggingAdapter = Logging.getLogger(system, this)
override def isShutdown: Boolean = system.whenTerminated.isCompleted
override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
import ActorAttributes._
import Attributes._
opAttr.attributeList.foldLeft(settings) { (s, attr) =>
attr match {
case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
case _ => s
}
}
}
override def withNamePrefix(name: String): ExtendedActorMaterializer =
throw new UnsupportedOperationException()
override implicit def executionContext: ExecutionContextExecutor =
throw new UnsupportedOperationException()
override def schedulePeriodically(initialDelay: FiniteDuration,
interval: FiniteDuration,
task: Runnable): Cancellable =
system.scheduler.schedule(initialDelay, interval, task)(executionContext)
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
system.scheduler.scheduleOnce(delay, task)(executionContext)
override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
val info = Fusing.aggressive(runnableGraph).module.info
val graph = GGraph.empty[Module, Edge]
info.subModules.foreach(module => {
if (module.isCopied) {
val original = module.asInstanceOf[CopiedModule].copyOf
graph.addVertex(original)
module.shape.outlets.zip(original.shape.outlets).foreach(out => {
val (cout, oout) = out
val cin = info.downstreams(cout)
val downStreamModule = info.inOwners(cin)
if(downStreamModule.isCopied) {
val downStreamOriginal = downStreamModule.asInstanceOf[CopiedModule].copyOf
downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in => {
in._1 == cin match {
case true =>
val oin = in._2
graph.addEdge(original, Edge(oout, oin), downStreamOriginal)
case false =>
}
})
}
})
}
})
printGraph(graph)
val subGraphs = GraphPartitioner(strategy).partition(graph)
val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) =>
val materializer = subMaterializers(subGraph.getClass)
map ++ materializer.materialize(subGraph, map)
}
val mat = matValues.flatMap(pair => {
val (module, any) = pair
any match {
case notUsed: NotUsed =>
None
case others =>
val rt = module.shape match {
case sink: SinkShape[_] =>
Some(any)
case _ =>
None
}
rt
}
}).toList
val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last
resolveMaterialized(matModule.materializedValueComputation, matValues)
val rt = Some(mat).flatMap(any => {
any match {
case promise: Promise[_] =>
Some(promise.future)
case other =>
Some(other)
}
})
rt.orNull.asInstanceOf[Mat]
}
private def printGraph(graph: GGraph[Module, Edge]): Unit = {
val iterator = graph.topologicalOrderIterator
while (iterator.hasNext) {
val module = iterator.next()
// scalastyle:off println
module match {
case graphStageModule: GraphStageModule =>
graphStageModule.stage match {
case symbolicGraphStage: SymbolicGraphStage[_, _, _] =>
val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName
println(
s"${module.getClass.getSimpleName}(${symbolicName})"
)
case graphStage: GraphStage[_] =>
val name = graphStage.getClass.getSimpleName
println(
s"${module.getClass.getSimpleName}(${name})"
)
case other =>
println(
s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})"
)
}
case _ =>
println(module.getClass.getSimpleName)
}
// scalastyle:on println
}
}
override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
initialAttributes: Attributes): Mat = {
materialize(runnableGraph)
}
override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: (GraphInterpreterShell) => ActorRef): Mat = {
materialize(runnableGraph)
}
override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = {
materialize(runnableGraph)
}
override def makeLogger(logSource: Class[_]): LoggingAdapter = {
logger
}
def shutdown: Unit = {
subMaterializers.values.foreach(_.shutdown)
}
private def resolveMaterialized(mat: MaterializedValueNode,
materializedValues: mutable.Map[Module, Any]): Any = mat match {
case Atomic(m) =>
materializedValues.getOrElse(m, ())
case Combine(f, d1, d2) =>
f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
case Transform(f, d) =>
f(resolveMaterialized(d, materializedValues))
case Ignore =>
()
}
}