| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gearpump.streaming.appmaster |
| |
| import scala.concurrent.duration._ |
| import scala.util.{Failure, Try} |
| |
| import akka.actor.SupervisorStrategy.Stop |
| import akka.actor._ |
| import akka.remote.RemoteScope |
| import com.typesafe.config.Config |
| import org.apache.commons.lang.exception.ExceptionUtils |
| |
| import org.apache.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource |
| import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} |
| import org.apache.gearpump.cluster.appmaster.WorkerInfo |
| import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} |
| import org.apache.gearpump.cluster.worker.WorkerId |
| import org.apache.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig} |
| import org.apache.gearpump.streaming.ExecutorId |
| import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor |
| import org.apache.gearpump.streaming.appmaster.ExecutorManager._ |
| import org.apache.gearpump.streaming.executor.Executor |
| import org.apache.gearpump.util.{LogUtil, Util} |
| |
| /** |
| * ExecutorManager manage the start and stop of all executors. |
| * |
| * ExecutorManager will launch Executor when asked. It hide the details like starting |
| * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor |
| */ |
| private[appmaster] class ExecutorManager( |
| userConfig: UserConfig, |
| appContext: AppMasterContext, |
| executorFactory: (ExecutorContext, UserConfig, Address, ExecutorId) => Props, |
| clusterConfig: Config, |
| appName: String) |
| extends Actor { |
| |
| private val LOG = LogUtil.getLogger(getClass) |
| |
| import appContext.{appId, masterProxy, username} |
| |
| private var taskManager: ActorRef = null |
| |
| private implicit val actorSystem = context.system |
| private val systemConfig = context.system.settings.config |
| |
| private var executors = Map.empty[Int, ExecutorInfo] |
| |
| def receive: Receive = waitForTaskManager |
| |
| def waitForTaskManager: Receive = { |
| case SetTaskManager(taskManager) => |
| this.taskManager = taskManager |
| context.become(service orElse terminationWatch) |
| } |
| |
| // If something wrong on executor, ExecutorManager will stop the current executor, |
| // and wait for AppMaster to start a new executor. |
| override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, |
| withinTimeRange = 1.minute) { |
| case ex: Throwable => |
| val executorId = Try(sender.path.name.toInt) |
| executorId match { |
| case scala.util.Success(id) => { |
| executors -= id |
| LOG.error(s"Executor $id throws exception, stop it...\n" + |
| ExceptionUtils.getStackTrace(ex)) |
| } |
| case Failure(ex) => { |
| LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...") |
| } |
| } |
| Stop |
| } |
| |
| // Responds to outside queries |
| def service: Receive = { |
| case StartExecutors(resources, jar) => |
| masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar))) |
| case ExecutorSystemStarted(executorSystem, boundedJar) => |
| import executorSystem.{address, executorSystemId, resource => executorResource, worker} |
| |
| val executorId = executorSystemId |
| val executorContext = ExecutorContext(executorId, worker, appId, appName, |
| appMaster = context.parent, executorResource) |
| executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar) |
| |
| // Starts executor |
| val executor = context.actorOf(executorFactory(executorContext, userConfig, |
| address, executorId), executorId.toString) |
| executorSystem.bindLifeCycleWith(executor) |
| case StartExecutorSystemTimeout => |
| taskManager ! StartExecutorsTimeOut |
| |
| case RegisterExecutor(executor, executorId, resource, worker) => |
| LOG.info(s"executor $executorId has been launched") |
| // Watches for executor termination |
| context.watch(executor) |
| val executorInfo = executors.get(executorId).get |
| executors += executorId -> executorInfo.copy(executor = executor) |
| taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar) |
| |
| // Broadcasts message to all executors |
| case BroadCast(msg) => |
| LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors") |
| context.children.foreach(_ forward msg) |
| |
| // Unicasts message to single executor |
| case UniCast(executorId, msg) => |
| LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId") |
| val executor = executors.get(executorId) |
| executor.foreach(_.executor forward msg) |
| |
| case GetExecutorInfo => |
| sender ! executors |
| |
| // Tells Executor manager resources that are occupied. The Executor Manager can use this |
| // information to tell worker to reclaim un-used resources |
| case ExecutorResourceUsageSummary(resources) => |
| executors.foreach { pair => |
| val (executorId, executor) = pair |
| val resource = resources.get(executorId) |
| val worker = executor.worker.ref |
| // Notifies the worker the actual resource used by this application. |
| resource match { |
| case Some(resource) => |
| worker ! ChangeExecutorResource(appId, executorId, resource) |
| case None => |
| worker ! ChangeExecutorResource(appId, executorId, Resource(0)) |
| } |
| } |
| } |
| |
| def terminationWatch: Receive = { |
| case Terminated(actor) => |
| val executorId = Try(actor.path.name.toInt) |
| executorId match { |
| case scala.util.Success(id) => { |
| executors -= id |
| LOG.error(s"Executor $id is down") |
| taskManager ! ExecutorStopped(id) |
| } |
| case scala.util.Failure(ex) => |
| LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex) |
| } |
| } |
| |
| private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = { |
| val executorAkkaConfig = clusterConfig |
| val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor |
| |
| ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar, |
| username, executorAkkaConfig) |
| } |
| } |
| |
| private[appmaster] object ExecutorManager { |
| case class StartExecutors(resources: Array[ResourceRequest], jar: AppJar) |
| case class BroadCast(msg: Any) |
| |
| case class UniCast(executorId: Int, msg: Any) |
| |
| case object GetExecutorInfo |
| |
| case class ExecutorStarted( |
| executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar]) |
| case class ExecutorStopped(executorId: Int) |
| |
| case class SetTaskManager(taskManager: ActorRef) |
| |
| case object StartExecutorsTimeOut |
| |
| def props( |
| userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String) |
| : Props = { |
| val executorFactory = |
| (executorContext: ExecutorContext, |
| userConfig: UserConfig, |
| address: Address, |
| executorId: ExecutorId) => |
| Props(classOf[Executor], executorContext, userConfig) |
| .withDeploy(Deploy(scope = RemoteScope(address))) |
| |
| Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName)) |
| } |
| |
| case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource]) |
| |
| case class ExecutorInfo( |
| executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar]) |
| } |