blob: 48d06f7a2127416629ba10c8ce7f8ea35b53839a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This code is similar to MaterializerSession and will be deprecated in the next release.
*/
package akka.stream
import scala.collection.mutable
import akka.stream.Attributes.Attribute
import akka.stream.ModuleGraph.Edge
import akka.stream.gearpump.util.MaterializedValueOps
import akka.stream.impl.StreamLayout._
import akka.stream.impl._
import akka.stream.{Graph => AkkaGraph}
import _root_.org.apache.gearpump.util
import _root_.org.apache.gearpump.util.Graph
/**
*
* ModuleGraph is a transformation on [[akka.stream.scaladsl.RunnableGraph]].
* It carries all the information of [[akka.stream.scaladsl.RunnableGraph]], but
* represents it in a different way.
*
* Here is the difference:
*
* RunnableGraph
* ==============================
* [[akka.stream.scaladsl.RunnableGraph]] is represented as a [[Module]] tree:
* TopLevelModule
* |
* -------------------
* | |
* SubModule1 SubModule2
* | |
* ---------------- ----------
* | | |
* AtomicModule1 AtomicModule2 AtomicModule3
*
* ModuleGraph
* ==============================
* [[ModuleGraph]] is represented as a [[util.Graph]] of Atomic [[Module]]:
*
* AtomicModule2 -> AtomicModule4
* /| \
* / \
* / \|
* AtomicModule1 AtomicModule5
* \ /|
* \ /
* \| /
* AtomicModule3
*
* Each vertex in the Graph is a [[Module]], each [[Edge]] in the Graph is a tuple
* ([[OutPort]], [[InPort]]). [[OutPort]] is one of upstream Atomic Module
* output ports. [[InPort]] is one of downstream Atomic Module input ports.
*
*
* Why use [[ModuleGraph]] instead of [[akka.stream.scaladsl.RunnableGraph]]?
* =========================
* There are several good reasons:):
* 1. [[ModuleGraph]] outlines explicitly the upstream/downstream relation.
* Each [[Edge]] of [[ModuleGraph]] represent a upstream/downstream pair.
* It is easier for user to understand the overall data flow.
*
* 2. It is easier for performance optimization.
* For the above Graph, if we want to fuse AtomicModule2 and AtomicModule3
* together, it can be done within [[ModuleGraph]]. We only need
* to substitute Pair(AtomicModule2, AtomicModule4) with a unified Module.
*
* 3. It avoids module duplication.
* In [[akka.stream.scaladsl.RunnableGraph]], composite Module can be re-used.
* It is possible that there can be duplicate Modules.
* The duplication problem causes big headache when doing materialization.
*
* [[ModuleGraph]] doesn't have thjis problem. [[ModuleGraph]] does a transformation on the Module
* Tree to make sure each Atomic Module [[ModuleGraph]] is unique.
*
*
* @param graph a Graph of Atomic modules.
* @param mat is a function of:
* input => materialized value of each Atomic module
* output => final materialized value.
* @tparam Mat
*/
class ModuleGraph[Mat](val graph: util.Graph[Module, Edge], val mat: MaterializedValueNode) {
def resolve(materializedValues: Map[Module, Any]): Mat = {
MaterializedValueOps(mat).resolve[Mat](materializedValues)
}
}
object ModuleGraph {
def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] = {
val topLevel = runnableGraph.module
val factory = new ModuleGraphFactory(topLevel)
val (graph, mat) = factory.create()
new ModuleGraph(graph, mat)
}
/**
*
* @param from outport of upstream module
* @param to inport of downstream module
*/
case class Edge(from: OutPort, to: InPort)
private class ModuleGraphFactory(val topLevel: StreamLayout.Module) {
private var subscribersStack: List[mutable.Map[InPort, (InPort, Module)]] =
mutable.Map.empty[InPort, (InPort, Module)].withDefaultValue(null) :: Nil
private var publishersStack: List[mutable.Map[OutPort, (OutPort, Module)]] =
mutable.Map.empty[OutPort, (OutPort, Module)].withDefaultValue(null) :: Nil
/*
* Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule
* itself. The reason is that the CopiedModule itself is only needed for the enterScope and exitScope methods but
* not elsewhere. For this reason they are just simply passed as parameters to those methods.
*
* The reason why the encapsulated (copied) modules are stored as mutable state to save subclasses of this class
* from passing the current scope around or even knowing about it.
*/
private var moduleStack: List[Module] = topLevel :: Nil
private def subscribers: mutable.Map[InPort, (InPort, Module)] = subscribersStack.head
private def publishers: mutable.Map[OutPort, (OutPort, Module)] = publishersStack.head
private def currentLayout: Module = moduleStack.head
private val graph = Graph.empty[Module, Edge]
private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = {
val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
module.withAttributes(currentAttributes).asInstanceOf[T]
}
private def materializeAtomic(atomic: Module, parentAttributes: Attributes): MaterializedValueNode = {
val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
val copied = copyAtomicModule(atomic, parentAttributes)
for ((in, id) <- inputs.zipWithIndex) {
val inPort = inPortMapping(atomic, copied)(in)
assignPort(in, (inPort, copied))
}
for ((out, id) <- outputs.zipWithIndex) {
val outPort = outPortMapping(atomic, copied)(out)
assignPort(out, (outPort, copied))
}
graph.addVertex(copied)
Atomic(copied)
}
def create(): (util.Graph[Module, Edge], MaterializedValueNode) = {
val mat = materializeModule(topLevel, Attributes.none)
(graph, mat)
}
private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = {
from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
}
private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = {
from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
}
private def materializeModule(module: Module, parentAttributes: Attributes): MaterializedValueNode = {
val materializedValues = collection.mutable.HashMap.empty[Module, MaterializedValueNode]
val currentAttributes = mergeAttributes(parentAttributes, module.attributes)
var materializedValueSources = List.empty[MaterializedValueSource[_]]
for (submodule <- module.subModules) {
submodule match {
case mv: MaterializedValueSource[_] =>
materializedValueSources :+= mv
case atomic if atomic.isAtomic =>
materializedValues.put(atomic, materializeAtomic(atomic, currentAttributes))
case copied: CopiedModule =>
enterScope(copied)
materializedValues.put(copied, materializeModule(copied, currentAttributes))
exitScope(copied)
case composite =>
materializedValues.put(composite, materializeComposite(composite, currentAttributes))
}
}
val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
materializedValueSources.foreach { module =>
val matAttribute = new MaterializedValueSourceAttribute(mat)
val copied = copyAtomicModule(module, parentAttributes and Attributes(matAttribute))
assignPort(module.shape.outlet, (copied.shape.outlet, copied))
graph.addVertex(copied)
materializedValues.put(copied, Atomic(copied))
}
mat
}
private def materializeComposite(composite: Module, effectiveAttributes: Attributes): MaterializedValueNode = {
materializeModule(composite, effectiveAttributes)
}
private def mergeAttributes(parent: Attributes, current: Attributes): Attributes = {
parent and current
}
private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, MaterializedValueNode]): MaterializedValueNode = matNode match {
case Atomic(m) => materializedValues(m)
case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
case Transform(f, d) => Transform(f, resolveMaterialized(d, materializedValues))
case Ignore => Ignore
}
final protected def assignPort(in: InPort, subscriber: (InPort, Module)): Unit = {
addVertex(subscriber._2)
subscribers(in) = subscriber
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.inPorts(in)) {
val out = currentLayout.upstreams(in)
val publisher = publishers(out)
if (publisher ne null) addEdge(publisher, subscriber)
}
}
final protected def assignPort(out: OutPort, publisher: (OutPort, Module)): Unit = {
addVertex(publisher._2)
publishers(out) = publisher
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.outPorts(out)) {
val in = currentLayout.downstreams(out)
val subscriber = subscribers(in)
if (subscriber ne null) addEdge(publisher, subscriber)
}
}
// Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
// of the same module.
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def enterScope(enclosing: CopiedModule): Unit = {
subscribersStack ::= mutable.Map.empty.withDefaultValue(null)
publishersStack ::= mutable.Map.empty.withDefaultValue(null)
moduleStack ::= enclosing.copyOf
}
// Exits the scope of the copied module and propagates Publishers/Subscribers to the enclosing scope assigning
// them to the copied ports instead of the original ones (since there might be multiple copies of the same module
// leading to port identity collisions)
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def exitScope(enclosing: CopiedModule): Unit = {
val scopeSubscribers = subscribers
val scopePublishers = publishers
subscribersStack = subscribersStack.tail
publishersStack = publishersStack.tail
moduleStack = moduleStack.tail
// When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of
// the original module and assign them to the copy ports in the outer scope that we will return to
enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach {
case (original, exposed) => assignPort(exposed, scopeSubscribers(original))
}
enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach {
case (original, exposed) => assignPort(exposed, scopePublishers(original))
}
}
private def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = {
graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2)
}
private def addVertex(module: Module): Unit = {
graph.addVertex(module)
}
}
final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute
}