blob: 99ebe177f82c659589d13ab24d2b0257a197663f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.akkastream.graph
import akka.actor.ActorSystem
import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl
import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule}
import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient
import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient
import akka.stream.impl.StreamLayout.Module
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.embedded.EmbeddedCluster
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.util.Graph
/**
*
* [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
* contain modules that can be materialized in remote Gearpump cluster.
*
* @param graph Graph
*/
class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph
object RemoteGraph {
/**
* * materialize LocalGraph in remote gearpump cluster
* @param useInProcessCluster Boolean
* @param system ActorSystem
*/
class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem)
extends SubGraphMaterializer {
private val local = if (useInProcessCluster) {
val cluster = EmbeddedCluster()
cluster.start()
Some(cluster)
} else {
None
}
private val context: ClientContext = local match {
case Some(l) => l.newClientContext
case None => ClientContext(system)
}
override def materialize(subGraph: SubGraph,
inputMatValues: scala.collection.mutable.Map[Module, Any]):
scala.collection.mutable.Map[Module, Any] = {
val graph = subGraph.graph
if (graph.isEmpty) {
inputMatValues
} else {
doMaterialize(graph: Graph[Module, Edge], inputMatValues)
}
}
private def doMaterialize(graph: Graph[Module, Edge],
inputMatValues: scala.collection.mutable.Map[Module, Any]):
scala.collection.mutable.Map[Module, Any] = {
val materializer = new RemoteMaterializerImpl(graph, system)
val (app, matValues) = materializer.materialize
val appId = context.submit(app).appId
// scalastyle:off println
println("sleep 5 second until the application is ready on cluster")
// scalastyle:on println
Thread.sleep(5000)
def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = {
matValues.toList.flatMap { kv =>
val (module, processorId) = kv
module match {
case source: SourceBridgeModule[_, _] =>
val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher,
context, appId, processorId)
Some((module, bridge))
case sink: SinkBridgeModule[_, _] =>
val bridge = new SinkBridgeTaskClient(system, context, appId, processorId)
Some((module, bridge))
case other =>
None
}
}.toMap
}
inputMatValues ++ resolve(matValues)
}
override def shutdown: Unit = {
context.close()
local.foreach(_.stop())
}
}
}