blob: 53e93f90842bccdc1b086273a89c4ee24b8d8a2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.experiments.yarn.appmaster
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
import org.apache.gearpump.experiments.yarn.Constants._
import org.apache.gearpump.experiments.yarn.glue.Records._
import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util._
import org.slf4j.Logger
import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
/**
* YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as
* YARN containers.
*
* NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker.
*/
class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
packagePath: String, hdfsConfDir: String,
uiFactory: UIFactory)
extends Actor {
private val LOG: Logger = LogUtil.getLogger(getClass)
private val akkaConf = context.system.settings.config
private val servicesEnabled = akkaConf.getString(SERVICES_ENABLED).toBoolean
private var uiStarted = false
private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
private val port = Util.findFreePort().get
private val trackingURL = "http://" + host + ":" + port
// TODO: for now, only one master is supported.
private val masterCount = 1
private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt
private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt
private var workerCount = akkaConf.getString(WORKER_CONTAINERS).toInt
private val workerMemory = akkaConf.getString(WORKER_MEMORY).toInt
private val workerVCores = akkaConf.getString(WORKER_VCORES).toInt
val rootPath = System.getProperty(Constants.GEARPUMP_FULL_SCALA_VERSION)
rmClient.start(self)
nmClient.start(self)
def receive: Receive = null
private def registerAppMaster(): Unit = {
val target = host + ":" + port
rmClient.registerAppMaster(host, port, trackingURL)
}
registerAppMaster
context.become(waitForAppMasterRegistered)
import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster._
def waitForAppMasterRegistered: Receive = {
case AppMasterRegistered =>
LOG.info("YarnAppMaster registration completed")
requestMasterContainers(masterCount)
context.become(startingMasters(remain = masterCount, List.empty[MasterInfo]))
}
private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box {
case ContainersAllocated(containers) =>
LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: "
+ containers.size)
val count = Math.min(containers.length, remain)
val newMasters = (0 until count).toList.map { index =>
val container = containers(index)
MasterInfo(container.getId, container.getNodeId, launchMaster(container))
}
// Stops un-used containers
containers.drop(count).map { container =>
nmClient.stopContainer(container.getId, container.getNodeId)
}
context.become(startingMasters(remain, newMasters ++ masters))
case ContainerStarted(containerId) =>
LOG.info(s"ContainerStarted: container ${containerId} started for master(remain=$remain) ")
if (remain > 1) {
context.become(startingMasters(remain - 1, masters))
} else {
requestWorkerContainers(workerCount)
context.become(startingWorkers(workerCount, masters, List.empty[WorkerInfo]))
}
}
private def box(receive: Receive): Receive = {
onError orElse receive orElse unHandled
}
private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo])
: Receive = {
box {
case ContainersAllocated(containers) =>
LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " +
s"count: " + containers.size)
val count = Math.min(containers.length, remain)
val newWorkers = (0 until count).toList.map { index =>
val container = containers(index)
launchWorker(container, masters)
WorkerInfo(container.getId, container.getNodeId)
}
// Stops un-used containers
containers.drop(count).map { container =>
nmClient.stopContainer(container.getId, container.getNodeId)
}
context.become(startingWorkers(remain, masters, workers ++ newWorkers))
case ContainerStarted(containerId) =>
LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)")
// The last one
if (remain > 1) {
context.become(startingWorkers(remain - 1, masters, workers))
} else {
if (servicesEnabled && !uiStarted) {
context.actorOf(uiFactory.props(masters.map(_.host), host, port))
uiStarted = true
}
context.become(service(effectiveConfig(masters.map(_.host)), masters, workers))
}
}
}
private def effectiveConfig(masters: List[HostPort]): Config = {
val masterList = masters.map(pair => s"${pair.host}:${pair.port}")
val config = context.system.settings.config
config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
ConfigValueFactory.fromIterable(masterList.asJava))
}
private def onError: Receive = {
case ContainersCompleted(containers) =>
// TODO: we should recover the failed container from this...
containers.foreach { status =>
if (status.getExitStatus != 0) {
LOG.error(s"ContainersCompleted: container ${status.getContainerId}" +
s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
} else {
LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed")
}
}
case ShutdownApplication =>
LOG.error("ShutdownApplication")
nmClient.stop()
rmClient.shutdownApplication()
context.stop(self)
case ResourceManagerException(ex) =>
LOG.error("ResourceManagerException: " + ex.getMessage, ex)
nmClient.stop()
rmClient.failApplication(ex)
context.stop(self)
case Kill =>
LOG.info("Kill: User asked to shutdown the application")
sender ! CommandResult(success = true)
self ! ShutdownApplication
}
private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo])
: Receive = box {
case GetActiveConfig(clientHost) =>
LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost)
val filtered = ClusterConfig.filterOutDefaultConfig(
config.withValue(Constants.GEARPUMP_HOSTNAME,
ConfigValueFactory.fromAnyRef(clientHost)))
sender ! ActiveConfig(filtered)
case QueryVersion =>
LOG.info("QueryVersion")
sender ! Version(Util.version)
case QueryClusterInfo =>
LOG.info("QueryClusterInfo")
val masterContainers = masters.map { master =>
master.id.toString + s"(${master.nodeId.toString})"
}
val workerContainers = workers.map { worker =>
worker.id.toString + s"(${worker.nodeId.toString})"
}
sender ! ClusterInfo(masterContainers, workerContainers)
case AddMaster =>
sender ! CommandResult(success = false, "Not Implemented")
case RemoveMaster(masterId) =>
sender ! CommandResult(success = false, "Not Implemented")
case AddWorker(count) =>
if (count == 0) {
sender ! CommandResult(success = true)
} else {
LOG.info("AddWorker: Start to add new workers, count: " + count)
workerCount += count
requestWorkerContainers(count)
context.become(startingWorkers(count, masters, workers))
sender ! CommandResult(success = true)
}
case RemoveWorker(worker) =>
val workerId = ContainerId.fromString(worker)
LOG.info(s"RemoveWorker: remove worker $workerId")
val info = workers.find(_.id.toString == workerId.toString)
if (info.isDefined) {
nmClient.stopContainer(info.get.id, info.get.nodeId)
sender ! CommandResult(success = true)
val remainWorkers = workers.filter(_.id != info.get.id)
context.become(service(config, masters, remainWorkers))
} else {
sender ! CommandResult(success = false, "failed to find worker " + worker)
}
}
private def unHandled: Receive = {
case other =>
LOG.info(s"Received unknown message $other")
}
private def requestMasterContainers(masters: Int) = {
LOG.info(s"Request resource for masters($masters)")
val containers = (1 to masters).map(
i => Resource.newInstance(masterMemory, masterVCores)
).toList
rmClient.requestContainers(containers)
}
private def launchMaster(container: Container): HostPort = {
LOG.info(s"Launch Master on container " + container.getNodeHttpAddress)
val host = container.getNodeId.getHost
val port = Util.findFreePort().get
LOG.info("=============PORT" + port)
val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port))
nmClient.launchCommand(container, masterCommand.get, packagePath, hdfsConfDir)
HostPort(host, port)
}
private def requestWorkerContainers(workers: Int): Unit = {
LOG.info(s"Request resource for workers($workers)")
val containers = (1 to workers).map(
i => Resource.newInstance(workerMemory, workerVCores)
).toList
rmClient.requestContainers(containers)
}
private def launchWorker(container: Container, masters: List[MasterInfo]): Unit = {
LOG.info(s"Launch Worker on container " + container.getNodeHttpAddress)
val workerHost = container.getNodeId.getHost
val workerCommand = WorkerCommand(akkaConf, rootPath, masters.head.host, workerHost)
nmClient.launchCommand(container, workerCommand.get, packagePath, hdfsConfDir)
}
}
object YarnAppMaster extends AkkaApp with ArgumentsParser {
val LOG: Logger = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
"conf" -> CLIOption[String]("<Gearpump configuration directory on HDFS>", required = true),
"package" -> CLIOption[String]("<Gearpump package path on HDFS>", required = true)
)
override def akkaConfig: Config = {
ClusterConfig.ui()
}
override def main(akkaConf: Config, args: Array[String]): Unit = {
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
implicit val system = ActorSystem("GearpumpAM", akkaConf)
val yarnConf = new YarnConfig()
val confDir = parse(args).getString("conf")
val packagePath = parse(args).getString("package")
LOG.info("HADOOP_CONF_DIR: " + System.getenv("HADOOP_CONF_DIR"))
LOG.info("YARN Resource Manager: " + yarnConf.resourceManager)
val rmClient = new RMClient(yarnConf)
val nmClient = new NMClient(yarnConf, akkaConf)
val appMaster = system.actorOf(Props(new YarnAppMaster(rmClient,
nmClient, packagePath, confDir, UIService)))
val daemon = system.actorOf(Props(new Daemon(appMaster)))
Await.result(system.whenTerminated, Duration.Inf)
LOG.info("YarnAppMaster is shutdown")
}
class Daemon(appMaster: ActorRef) extends Actor {
context.watch(appMaster)
override def receive: Actor.Receive = {
case Terminated(actor) =>
if (actor.compareTo(appMaster) == 0) {
LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " +
s"shutting down current ActorSystem")
context.system.terminate()
context.stop(self)
}
}
}
case class ResourceManagerException(throwable: Throwable)
case object ShutdownApplication
case class ContainersRequest(containers: List[Resource])
case class ContainersAllocated(containers: List[Container])
case class ContainersCompleted(containers: List[ContainerStatus])
case class ContainerStarted(containerId: ContainerId)
case object AppMasterRegistered
case class GetActiveConfig(clientHost: String)
case object QueryClusterInfo
case class ClusterInfo(masters: List[String], workers: List[String]) {
override def toString: String = {
val separator = "\n"
val masterSection = "masters: " + separator + masters.mkString("\n") + "\n"
val workerSection = "workers: " + separator + workers.mkString("\n") + "\n"
masterSection + workerSection
}
}
case object Kill
case class ActiveConfig(config: Config)
case object QueryVersion
case class Version(version: String)
case class MasterInfo(id: ContainerId, nodeId: NodeId, host: HostPort)
case class WorkerInfo(id: ContainerId, nodeId: NodeId)
def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
import org.apache.gearpump.experiments.yarn.client.AppMasterResolver
AppMasterResolver.resolveAppMasterAddress(report, system)
}
}