blob: 47ed1f21023b566628cccbf798e676dcb78d9f39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.stream.gearpump.materializer
import akka.actor.ActorSystem
import akka.stream.ModuleGraph.Edge
import akka.stream.gearpump.GearAttributes
import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, SinkBridgeTask, SourceBridgeTask, UnZip2Task}
import akka.stream.impl.Stages
import akka.stream.impl.Stages.StageModule
import akka.stream.impl.StreamLayout.Module
import org.slf4j.LoggerFactory
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl.StreamApp
import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp}
import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
import org.apache.gearpump.util.Graph
/**
* [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump
* Streaming Application.
*
* @param graph
* @param system
*/
class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
import RemoteMaterializerImpl._
type Clue = String
private implicit val actorSystem = system
private def uuid: String = {
java.util.UUID.randomUUID.toString
}
/**
* @return a mapping from Module to Materialized Processor Id.
*/
def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
val (opGraph, clues) = toOpGraph()
val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph)
val processorIds = resolveClues(app, clues)
val updatedApp = updateJunctionConfig(processorIds, app)
(cleanClues(updatedApp), processorIds)
}
private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], app: StreamApplication): StreamApplication = {
val config = junctionConfig(processorIds)
val dag = app.dag.mapVertex { vertex =>
val processorId = vertex.id
val newConf = vertex.taskConf.withConfig(config(processorId))
vertex.copy(taskConf = newConf)
}
new StreamApplication(app.name, app.inputUserConfig, dag)
}
/**
* Update junction config so that each GraphTask know its upstream and downstream.
* @param processorIds
* @return
*/
private def junctionConfig(processorIds: Map[Module, ProcessorId]): Map[ProcessorId, UserConfig] = {
val updatedConfigs = graph.vertices.map { vertex =>
val processorId = processorIds(vertex)
vertex match {
case junction: JunctionModule =>
val inProcessors = junction.shape.inlets.map { inlet =>
val upstreamModule = graph.incomingEdgesOf(junction).find(_._2.to == inlet).map(_._1)
val upstreamProcessorId = processorIds(upstreamModule.get)
upstreamProcessorId
}.toList
val outProcessors = junction.shape.outlets.map { outlet =>
val downstreamModule = graph.outgoingEdgesOf(junction).find(_._2.from == outlet).map(_._3)
val downstreamProcessorId = downstreamModule.map(processorIds(_))
downstreamProcessorId.get
}.toList
(processorId, UserConfig.empty.withValue(GraphTask.OUT_PROCESSORS, outProcessors)
.withValue(GraphTask.IN_PROCESSORS, inProcessors))
case _ =>
(processorId, UserConfig.empty)
}
}.toMap
updatedConfigs
}
private def resolveClues(app: StreamApplication, clues: Map[Module, Clue]): Map[Module, ProcessorId] = {
clues.flatMap { kv =>
val (module, clue) = kv
val processorId = app.dag.vertices.find { processor =>
processor.taskConf.getString(clue).isDefined
}.map(_.id)
processorId.map((module, _))
}
}
private def cleanClues(app: StreamApplication): StreamApplication = {
val graph = app.dag.mapVertex { processor =>
val conf = cleanClue(processor.taskConf)
processor.copy(taskConf = conf)
}
new StreamApplication(app.name, app.inputUserConfig, graph)
}
private def cleanClue(conf: UserConfig): UserConfig = {
conf.filter { kv =>
kv._2 != RemoteMaterializerImpl.STAINS
}
}
private def toOpGraph(): (Graph[Op, OpEdge], Map[Module, Clue]) = {
var matValues = Map.empty[Module, Clue]
val opGraph = graph.mapVertex{ module =>
val name = uuid
val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.STAINS)
matValues += module -> name
val parallelism = GearAttributes.count(module.attributes)
val op = module match {
case source: SourceTaskModule[t] =>
val updatedConf = conf.withConfig(source.conf)
new DataSourceOp[t](source.source, parallelism, updatedConf, "source")
case sink: SinkTaskModule[t] =>
val updatedConf = conf.withConfig(sink.conf)
new DataSinkOp[t](sink.sink, parallelism, updatedConf, "sink")
case sourceBridge: SourceBridgeModule[_, _] =>
new ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
case processor: ProcessorModule[_, _, _] =>
val updatedConf = conf.withConfig(processor.conf)
new ProcessorOp(processor.processor, parallelism, updatedConf, "source")
case sinkBridge: SinkBridgeModule[_, _] =>
new ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
case groupBy: GroupByModule[t, g] =>
new GroupByOp[t, g](groupBy.groupBy, parallelism, "groupBy", conf)
case reduce: ReduceModule[Any] =>
reduceOp(reduce.f, conf)
case stage: StageModule =>
translateStage(stage, conf)
case fanIn: FanInModule =>
translateFanIn(fanIn, graph.incomingEdgesOf(fanIn), parallelism, conf)
case fanOut: FanOutModule =>
translateFanOut(fanOut, graph.outgoingEdgesOf(fanOut), parallelism, conf)
}
if (op == null) {
throw new UnsupportedOperationException(module.getClass.toString + " is not supported with RemoteMaterializer")
}
op
}.mapEdge[OpEdge]{(n1, edge, n2) =>
n2 match {
case master: MasterOp =>
Shuffle
case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
Shuffle
case slave: SlaveOp[_] =>
Direct
}
}
(opGraph, matValues)
}
private def translateStage(module: StageModule, conf: UserConfig): Op = {
module match {
case buffer: Stages.Buffer =>
//ignore the buffering operation
identity("buffer", conf)
case collect: Stages.Collect =>
collectOp(collect.pf, conf)
case concatAll: Stages.ConcatAll =>
//TODO:
null
case conflat: Stages.Conflate =>
conflatOp(conflat.seed, conflat.aggregate, conf)
case drop: Stages.Drop =>
dropOp(drop.n, conf)
case dropWhile: Stages.DropWhile =>
dropWhileOp(dropWhile.p, conf)
case expand: Stages.Expand =>
//TODO
null
case filter: Stages.Filter =>
filterOp(filter.p, conf)
case fold: Stages.Fold =>
foldOp(fold.zero, fold.f, conf)
case groupBy: Stages.GroupBy =>
//TODO
null
case grouped: Stages.Grouped =>
groupedOp(grouped.n, conf)
case _: Stages.Identity =>
identity("identity", conf)
case log: Stages.Log =>
logOp(log.name, log.extract, conf)
case map: Stages.Map =>
mapOp(map.f, conf)
case mapAsync: Stages.MapAsync =>
//TODO
null
case mapAsync: Stages.MapAsyncUnordered =>
//TODO
null
case flatMap: Stages.MapConcat =>
flatMapOp(flatMap.f, "mapConcat", conf)
case stage: MaterializingStageFactory =>
//TODO
null
case prefixAndTail: Stages.PrefixAndTail =>
//TODO
null
case recover: Stages.Recover =>
//TODO: we will just ignore this
identity("recover", conf)
case scan: Stages.Scan =>
scanOp(scan.zero, scan.f, conf)
case split: Stages.Split =>
//TODO
null
case stage: Stages.StageFactory =>
//TODO
null
case take: Stages.Take =>
takeOp(take.n, conf)
case takeWhile: Stages.TakeWhile =>
filterOp(takeWhile.p, conf)
case time: Stages.TimerTransform =>
//TODO
null
}
}
private def translateFanIn(
fanIn: FanInModule,
edges: List[(Module, Edge, Module)],
parallelism: Int,
conf: UserConfig): Op = {
fanIn match {
case merge: MergeModule[_] =>
MergeOp("merge", conf)
case mergePrefered: MergePreferredModule[_] =>
//TODO, support "prefer" merge
MergeOp("mergePrefered", conf)
case zip: ZipWithModule =>
//TODO: support zip module
null
case concat: ConcatModule[_] =>
//TODO: support concat module
null
case flexiMerge: FlexiMergeModule[_, _] =>
//TODO: Suport flexi merge module
null
}
}
private def translateFanOut(
fanOut: FanOutModule,
edges: List[(Module, Edge, Module)],
parallelism: Int,
conf: UserConfig): Op = {
fanOut match {
case unzip2: UnzipWith2Module[Any, Any, Any] =>
val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new UnZip2Task.UnZipFunction(unzip2.f))
new ProcessorOp(classOf[UnZip2Task], parallelism, updatedConf, "unzip")
case broadcast: BroadcastModule[_] =>
new ProcessorOp(classOf[BroadcastTask], parallelism, conf, "broadcast")
case broadcast: BalanceModule[_] =>
new ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
case flexi: FlexiRouteImpl[_, _] =>
//TODO
null
}
}
}
object RemoteMaterializerImpl {
final val NotApplied: Any => Any = _ => NotApplied
def collectOp(collect: PartialFunction[Any, Any], conf: UserConfig): Op = {
flatMapOp({ data =>
collect.applyOrElse(data, NotApplied) match {
case NotApplied => None
case result: Any => Option(result)
}
}, "collect", conf)
}
def filterOp(filter: Any => Boolean, conf: UserConfig): Op = {
flatMapOp({ data =>
if (filter(data)) Option(data) else None
}, "filter", conf)
}
def reduceOp(reduce: (Any, Any) => Any, conf: UserConfig): Op = {
var result: Any = null
val flatMap = { elem: Any =>
if (result == null) {
result = elem
} else {
result = reduce(result, elem)
}
List(result)
}
flatMapOp(flatMap, "reduce", conf)
}
def identity(description: String, conf: UserConfig): Op = {
flatMapOp({ data =>
List(data)
}, description, conf)
}
def mapOp(map: Any => Any, conf: UserConfig): Op = {
flatMapOp({ data: Any =>
List(map(data))
}, "map", conf)
}
def flatMapOp(flatMap: Any => Iterable[Any], conf: UserConfig): Op = {
flatMapOp(flatMap, "flatmap", conf)
}
def flatMapOp(fun: Any => TraversableOnce[Any], description: String, conf: UserConfig): Op = {
FlatMapOp(fun, description, conf)
}
def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: UserConfig): Op = {
var agg: Any = null
val flatMap = { elem: Any =>
agg = if (agg == null) {
seed(elem)
} else {
aggregate(agg, elem)
}
List(agg)
}
flatMapOp(flatMap, "map", conf)
}
def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = {
var aggregator: Any = zero
val map = { elem: Any =>
aggregator = fold(aggregator, elem)
List(aggregator)
}
flatMapOp(map, "fold", conf)
}
def groupedOp(count: Int, conf: UserConfig): Op = {
var left = count
val buf = {
val b = Vector.newBuilder[Any]
b.sizeHint(count)
b
}
val flatMap: Any => Iterable[Any] = { input: Any =>
buf += input
left -= 1
if (left == 0) {
val emit = buf.result()
buf.clear()
left = count
Some(emit)
} else {
None
}
}
flatMapOp(flatMap, conf: UserConfig)
}
def dropOp(number: Long, conf: UserConfig): Op = {
var left = number
val flatMap: Any => Iterable[Any] = { input: Any =>
if (left > 0) {
left -= 1
None
} else {
Some(input)
}
}
flatMapOp(flatMap, "drop", conf)
}
def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = {
flatMapOp({ data =>
if (drop(data)) None else Option(data)
}, "dropWhile", conf)
}
def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = {
val flatMap = { elem: Any =>
LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
List(elem)
}
flatMapOp(flatMap, "log", conf)
}
def scanOp(zero: Any, f: (Any, Any) => Any, conf: UserConfig): Op = {
var aggregator = zero
var pushedZero = false
val flatMap = { elem: Any =>
aggregator = f(aggregator, elem)
if (pushedZero) {
pushedZero = true
List(zero, aggregator)
} else {
List(aggregator)
}
}
flatMapOp(flatMap, "scan", conf)
}
def takeOp(count: Long, conf: UserConfig): Op = {
var left: Long = count
val filter: Any => Iterable[Any] = { elem: Any =>
left -= 1
if (left > 0) Some(elem)
else if (left == 0) Some(elem)
else None
}
flatMapOp(filter, "take", conf)
}
/**
* We use stains to track how module maps to Processor
*
*/
val STAINS = "track how module is fused to processor"
}