| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package akka.stream.gearpump.materializer |
| |
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} |
| |
| import akka.actor.{ActorRef, ActorSystem} |
| import akka.dispatch.Dispatchers |
| import akka.stream.ModuleGraph.{Edge, MaterializedValueSourceAttribute} |
| import akka.stream.actor.ActorSubscriber |
| import akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule |
| import akka.stream.gearpump.module.ReduceModule |
| import akka.stream.gearpump.util.MaterializedValueOps |
| import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule} |
| import akka.stream.impl.StreamLayout.Module |
| import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor} |
| import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape} |
| import org.reactivestreams.{Processor, Publisher, Subscriber} |
| |
| import org.apache.gearpump.util.Graph |
| |
| /** |
| * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] |
| * |
| * @param system |
| * @param settings |
| * @param dispatchers |
| * @param supervisor |
| * @param haveShutDown |
| * @param flowNameCounter |
| * @param namePrefix |
| * @param optimizations |
| */ |
| class LocalMaterializerImpl ( |
| system: ActorSystem, |
| settings: ActorMaterializerSettings, |
| dispatchers: Dispatchers, |
| supervisor: ActorRef, |
| haveShutDown: AtomicBoolean, |
| flowNameCounter: AtomicLong, |
| namePrefix: String, |
| optimizations: Optimizations) |
| extends LocalMaterializer( |
| system, settings, dispatchers, supervisor, |
| haveShutDown, flowNameCounter, namePrefix, optimizations) { |
| |
| override def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = { |
| val materializedGraph = graph.mapVertex { module => |
| materializeAtomic(module) |
| } |
| |
| materializedGraph.edges.foreach { nodeEdgeNode => |
| val (node1, edge, node2) = nodeEdgeNode |
| val from = edge.from |
| val to = edge.to |
| val publisher = node1.outputs(from).asInstanceOf[Publisher[Any]] |
| val subscriber = node2.inputs(to).asInstanceOf[Subscriber[Any]] |
| publisher.subscribe(subscriber) |
| } |
| |
| val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex => |
| (vertex.module, vertex.matValue) |
| }.toMap |
| |
| val matValueSources = materializedGraph.vertices.filter(_.module.isInstanceOf[MaterializedValueSource[_]]) |
| publishToMaterializedValueSource(matValueSources, matValues) |
| |
| matValues |
| } |
| |
| private def publishToMaterializedValueSource(modules: List[MaterializedModule], matValues: Map[Module, Any]) = { |
| modules.foreach { module => |
| val source = module.module.asInstanceOf[MaterializedValueSource[_]] |
| val attr = source.attributes.getAttribute(classOf[MaterializedValueSourceAttribute], null) |
| |
| Option(attr).map { attr => |
| val valueToPublish = MaterializedValueOps(attr.mat).resolve[Any](matValues) |
| module.outputs.foreach { portAndPublisher => |
| val (port, publisher) = portAndPublisher |
| publisher match { |
| case valuePublisher: MaterializedValuePublisher => |
| valuePublisher.setValue(valueToPublish) |
| } |
| } |
| } |
| } |
| } |
| |
| private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() |
| |
| private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" |
| |
| val flowName = createFlowName() |
| var nextId = 0 |
| |
| def stageName(attr: Attributes): String = { |
| val name = s"$flowName-$nextId-${attr.nameOrDefault()}" |
| nextId += 1 |
| name |
| } |
| |
| private def materializeAtomic(atomic: Module): MaterializedModule = { |
| val effectiveAttributes = atomic.attributes |
| |
| def newMaterializationContext() = |
| new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes)) |
| |
| atomic match { |
| case matValue: MaterializedValueSource[_] => |
| val pub = new MaterializedValuePublisher |
| val outputs = Map[OutPort, Publisher[_]](matValue.shape.outlet -> pub) |
| MaterializedModule(matValue, (), outputs = outputs) |
| case sink: SinkModule[_, _] => |
| val (sub, mat) = sink.create(newMaterializationContext()) |
| val inputs = Map[InPort, Subscriber[_]](sink.shape.inlet -> sub) |
| MaterializedModule(sink, mat, inputs) |
| case source: SourceModule[_, _] => |
| val (pub, mat) = source.create(newMaterializationContext()) |
| val outputs = Map[OutPort, Publisher[_]](source.shape.outlet -> pub) |
| MaterializedModule(source, mat, outputs = outputs) |
| |
| case reduce: ReduceModule[Any] => |
| //TODO: remove this after the official akka-stream API support the Reduce Module |
| val stage = LocalMaterializerImpl.toFoldModule(reduce) |
| val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) |
| val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor) |
| val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor) |
| MaterializedModule(stage, mat, inputs, outputs) |
| |
| case stage: StageModule => |
| val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) |
| val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor) |
| val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor) |
| MaterializedModule(stage, mat, inputs, outputs) |
| case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here |
| val es = effectiveSettings(effectiveAttributes) |
| val props = |
| SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing) |
| val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) |
| def factory(id: Int) = new ActorPublisher[Any](impl) { |
| override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) |
| } |
| val publishers = Vector.tabulate(2)(factory) |
| impl ! FanOut.ExposedPublishers(publishers) |
| |
| val inputs = Map[InPort, Subscriber[_]]( |
| tls.plainIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn), |
| tls.cipherIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn)) |
| |
| val outputs = Map[OutPort, Publisher[_]]( |
| tls.plainOut -> publishers(SslTlsCipherActor.UserOut), |
| tls.cipherOut -> publishers(SslTlsCipherActor.TransportOut)) |
| MaterializedModule(tls, (), inputs, outputs) |
| |
| case junction: JunctionModule => |
| materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes)) |
| } |
| } |
| |
| private def processorFor(op: StageModule, |
| effectiveAttributes: Attributes, |
| effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match { |
| case DirectProcessor(processorFactory, _) => processorFactory() |
| case Identity(attr) => (new VirtualProcessor, ()) |
| case _ => |
| val (opprops, mat) = ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes) |
| ActorProcessorFactory[Any, Any]( |
| actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat |
| } |
| |
| private def materializeJunction( |
| op: JunctionModule, |
| effectiveAttributes: Attributes, |
| effectiveSettings: ActorMaterializerSettings): MaterializedModule = { |
| op match { |
| case fanin: FanInModule => |
| val (props, inputs, output) = fanin match { |
| |
| case MergeModule(shape, _) => |
| (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out) |
| |
| case f: FlexiMergeModule[_, Shape] => |
| val flexi = f.flexi(f.shape) |
| val shape: Shape = f.shape |
| (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head) |
| |
| case MergePreferredModule(shape, _) => |
| (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out) |
| |
| case ConcatModule(shape, _) => |
| require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO |
| (Concat.props(effectiveSettings), shape.inSeq, shape.out) |
| |
| case zip: ZipWithModule => |
| (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head) |
| } |
| |
| val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) |
| val publisher = new ActorPublisher[Any](impl) |
| // Resolve cyclic dependency with actor. This MUST be the first message no matter what. |
| impl ! ExposedPublisher(publisher) |
| |
| val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map { pair => |
| val (in, id) = pair |
| (in, FanIn.SubInput[Any](impl, id)) |
| }.toMap |
| |
| val outMapping = Map(output -> publisher) |
| MaterializedModule(fanin, (), inputMapping, outMapping) |
| |
| case fanout: FanOutModule => |
| val (props, in, outs) = fanout match { |
| |
| case r: FlexiRouteModule[t, Shape] => |
| val flexi = r.flexi(r.shape) |
| val shape: Shape = r.shape |
| (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets) |
| |
| case BroadcastModule(shape, eagerCancel, _) => |
| (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) |
| |
| case BalanceModule(shape, waitForDownstreams, _) => |
| (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) |
| |
| case unzip: UnzipWithModule => |
| (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets) |
| } |
| val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) |
| val size = outs.size |
| def factory(id: Int) = |
| new ActorPublisher[Any](impl) { |
| override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) |
| } |
| val publishers = |
| if (outs.size < 8) Vector.tabulate(size)(factory) |
| else List.tabulate(size)(factory) |
| |
| impl ! FanOut.ExposedPublishers(publishers) |
| val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) => |
| (out, pub) |
| }.toMap |
| |
| val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl)) |
| MaterializedModule(fanout, (), inputs, outputs) |
| } |
| } |
| |
| override def withNamePrefix(name: String): Materializer = { |
| new LocalMaterializerImpl(system, settings, dispatchers, supervisor, |
| haveShutDown, flowNameCounter, namePrefix = name, optimizations) |
| } |
| } |
| |
| object LocalMaterializerImpl { |
| case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]]) |
| |
| def toFoldModule(reduce: ReduceModule[Any]): Fold = { |
| val f = reduce.f |
| val aggregator = { (zero: Any, input: Any) => |
| if (zero == null) { |
| input |
| } else { |
| f(zero, input) |
| } |
| } |
| new Fold(null, aggregator) |
| } |
| } |