package org.apache.gearpump.streaming.appmaster
import scala.concurrent.duration._
import scala.util.{Failure, Try}
import akka.remote.RemoteScope
import com.typesafe.config.Config
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
import org.apache.gearpump.cluster.appmaster.WorkerInfo
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig}
import org.apache.gearpump.streaming.ExecutorId
import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
import org.apache.gearpump.streaming.appmaster.ExecutorManager._
import org.apache.gearpump.streaming.executor.Executor
import org.apache.gearpump.util.{LogUtil, Util}
* ExecutorManager manage the start and stop of all executors.
* ExecutorManager will launch Executor when asked. It hide the details like starting
* a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor
private[appmaster] class ExecutorManager(
userConfig: UserConfig,
appContext: AppMasterContext,
executorFactory: (ExecutorContext, UserConfig, Address, ExecutorId) => Props,
clusterConfig: Config,
appName: String)
extends Actor {
private val LOG = LogUtil.getLogger(getClass)
import appContext.{appId, masterProxy, username}
private var taskManager: ActorRef = null
private implicit val actorSystem = context.system
private val systemConfig = context.system.settings.config
private var executors = Map.empty[Int, ExecutorInfo]
def receive: Receive = waitForTaskManager
def waitForTaskManager: Receive = {
case SetTaskManager(taskManager) =>
this.taskManager = taskManager
context.become(service orElse terminationWatch)
// If something wrong on executor, ExecutorManager will stop the current executor,
// and wait for AppMaster to start a new executor.
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10,
withinTimeRange = 1.minute) {
case ex: Throwable =>
val executorId = Try(
executorId match {
case scala.util.Success(id) => {
executors -= id
LOG.error(s"Executor $id throws exception, stop it...\n" +
case Failure(ex) => {
LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...")
// Responds to outside queries
def service: Receive = {
case StartExecutors(resources, jar) =>
masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar)))
case ExecutorSystemStarted(executorSystem, boundedJar) =>
import executorSystem.{address, executorSystemId, resource => executorResource, worker}
val executorId = executorSystemId
val executorContext = ExecutorContext(executorId, worker, appId, appName,
appMaster = context.parent, executorResource)
executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar)
// Starts executor
val executor = context.actorOf(executorFactory(executorContext, userConfig,
address, executorId), executorId.toString)
case StartExecutorSystemTimeout =>
taskManager ! StartExecutorsTimeOut
case RegisterExecutor(executor, executorId, resource, worker) =>"executor $executorId has been launched")
// Watches for executor termination
val executorInfo = executors.get(executorId).get
executors += executorId -> executorInfo.copy(executor = executor)
taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar)
// Broadcasts message to all executors
case BroadCast(msg) =>"Broadcast ${msg.getClass.getSimpleName} to all executors")
context.children.foreach(_ forward msg)
// Unicasts message to single executor
case UniCast(executorId, msg) =>"Unicast ${msg.getClass.getSimpleName} to executor $executorId")
val executor = executors.get(executorId)
executor.foreach(_.executor forward msg)
case GetExecutorInfo =>
sender ! executors
// Tells Executor manager resources that are occupied. The Executor Manager can use this
// information to tell worker to reclaim un-used resources
case ExecutorResourceUsageSummary(resources) =>
executors.foreach { pair =>
val (executorId, executor) = pair
val resource = resources.get(executorId)
val worker = executor.worker.ref
// Notifies the worker the actual resource used by this application.
resource match {
case Some(resource) =>
worker ! ChangeExecutorResource(appId, executorId, resource)
case None =>
worker ! ChangeExecutorResource(appId, executorId, Resource(0))
def terminationWatch: Receive = {
case Terminated(actor) =>
val executorId = Try(
executorId match {
case scala.util.Success(id) => {
executors -= id
LOG.error(s"Executor $id is down")
taskManager ! ExecutorStopped(id)
case scala.util.Failure(ex) =>
LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex)
private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = {
val executorAkkaConfig = clusterConfig
val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor
ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar,
username, executorAkkaConfig)
private[appmaster] object ExecutorManager {
case class StartExecutors(resources: Array[ResourceRequest], jar: AppJar)
case class BroadCast(msg: Any)
case class UniCast(executorId: Int, msg: Any)
case object GetExecutorInfo
case class ExecutorStarted(
executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
case class ExecutorStopped(executorId: Int)
case class SetTaskManager(taskManager: ActorRef)
case object StartExecutorsTimeOut
def props(
userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String)
: Props = {
val executorFactory =
(executorContext: ExecutorContext,
userConfig: UserConfig,
address: Address,
executorId: ExecutorId) =>
Props(classOf[Executor], executorContext, userConfig)
.withDeploy(Deploy(scope = RemoteScope(address)))
Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName))
case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource])
case class ExecutorInfo(
executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar])