| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gearpump.cluster.appmaster |
| |
| import org.apache.gearpump.cluster.worker.WorkerId |
| |
| import scala.concurrent.duration._ |
| |
| import akka.actor._ |
| import com.typesafe.config.Config |
| |
| import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource |
| import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated |
| import org.apache.gearpump.cluster._ |
| import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._ |
| import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler._ |
| import org.apache.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest} |
| import org.apache.gearpump.util.{Constants, LogUtil} |
| |
| /** |
| * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster. |
| * AppMaster can use this class to directly request a live executor actor systems. The communication |
| * in the background with Master and Worker is hidden from AppMaster. |
| * |
| * Please use ExecutorSystemScheduler.props() to construct this actor |
| */ |
| private[appmaster] |
| class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef, |
| executorSystemLauncher: (Int, Session) => Props) extends Actor { |
| |
| private val LOG = LogUtil.getLogger(getClass, app = appId) |
| implicit val timeout = Constants.FUTURE_TIMEOUT |
| implicit val actorSystem = context.system |
| var currentSystemId = 0 |
| |
| var resourceAgents = Map.empty[Session, ActorRef] |
| |
| def receive: Receive = { |
| clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler |
| } |
| |
| def clientCommands: Receive = { |
| case start: StartExecutorSystems => |
| LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " + |
| s"Resources(${start.resources.mkString(",")}))") |
| val requestor = sender() |
| val executorSystemConfig = start.executorSystemConfig |
| val session = Session(requestor, executorSystemConfig) |
| val agent = resourceAgents.getOrElse(session, |
| context.actorOf(Props(new ResourceAgent(masterProxy, session)))) |
| resourceAgents = resourceAgents + (session -> agent) |
| |
| start.resources.foreach { resource => |
| agent ! RequestResource(appId, resource) |
| } |
| |
| case StopExecutorSystem(executorSystem) => |
| executorSystem.shutdown |
| } |
| |
| def resourceAllocationMessageHandler: Receive = { |
| case ResourceAllocatedForSession(allocations, session) => |
| if (isSessionAlive(session)) { |
| allocations.foreach { resourceAllocation => |
| val ResourceAllocation(resource, worker, workerId) = resourceAllocation |
| |
| val launcher = context.actorOf(executorSystemLauncher(appId, session)) |
| launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource) |
| currentSystemId = currentSystemId + 1 |
| } |
| } |
| case ResourceAllocationTimeOut(session) => |
| if (isSessionAlive(session)) { |
| resourceAgents = resourceAgents - session |
| session.requestor ! StartExecutorSystemTimeout |
| } |
| } |
| |
| def executorSystemMessageHandler: Receive = { |
| case LaunchExecutorSystemSuccess(system, session) => |
| if (isSessionAlive(session)) { |
| LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor) |
| system.bindLifeCycleWith(self) |
| session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar) |
| } else { |
| LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " + |
| "Will shutdown the allocated system") |
| system.shutdown |
| } |
| case LaunchExecutorSystemTimeout(session) => |
| if (isSessionAlive(session)) { |
| LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout") |
| session.requestor ! StartExecutorSystemTimeout |
| } |
| |
| case LaunchExecutorSystemRejected(resource, reason, session) => |
| if (isSessionAlive(session)) { |
| LOG.error(s"Failed to launch executor system, due to $reason, " + |
| s"will ask master to allocate new resources $resource") |
| resourceAgents.get(session).map { resourceAgent: ActorRef => |
| resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) |
| } |
| } |
| } |
| |
| private def isSessionAlive(session: Session): Boolean = { |
| Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty |
| } |
| } |
| |
| object ExecutorSystemScheduler { |
| |
| case class StartExecutorSystems( |
| resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig) |
| |
| case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar]) |
| |
| case class StopExecutorSystem(system: ExecutorSystem) |
| |
| case object StartExecutorSystemTimeout |
| |
| case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String], |
| jar: Option[AppJar], username: String, executorAkkaConfig: Config = null) |
| |
| /** |
| * For each client which ask for an executor system, the scheduler will create a session for it. |
| * |
| */ |
| private[appmaster] |
| case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig) |
| |
| /** |
| * This is a agent for session to request resource |
| * |
| * @param session the original requester of the resource requests |
| */ |
| private[appmaster] |
| class ResourceAgent(master: ActorRef, session: Session) extends Actor { |
| private var resourceRequestor: ActorRef = null |
| var timeOutClock: Cancellable = null |
| private var unallocatedResource: Int = 0 |
| |
| import context.dispatcher |
| |
| import org.apache.gearpump.util.Constants._ |
| |
| val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT) |
| |
| def receive: Receive = { |
| case request: RequestResource => |
| unallocatedResource += request.request.resource.slots |
| Option(timeOutClock).map(_.cancel) |
| timeOutClock = context.system.scheduler.scheduleOnce( |
| timeout.seconds, self, ResourceAllocationTimeOut(session)) |
| resourceRequestor = sender |
| master ! request |
| case ResourceAllocated(allocations) => |
| unallocatedResource -= allocations.map(_.resource.slots).sum |
| resourceRequestor forward ResourceAllocatedForSession(allocations, session) |
| case timeout: ResourceAllocationTimeOut => |
| if (unallocatedResource > 0) { |
| resourceRequestor ! ResourceAllocationTimeOut(session) |
| // We will not receive any ResourceAllocation after timeout |
| context.stop(self) |
| } |
| } |
| } |
| |
| private[ExecutorSystemScheduler] |
| case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session) |
| |
| private[ExecutorSystemScheduler] |
| case class ResourceAllocationTimeOut(session: Session) |
| |
| } |