blob: d73cc2f6d04afd11ecf9ab32eea50ddcb96cea8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.appmaster
import org.apache.gearpump.cluster.worker.WorkerId
import scala.concurrent.duration._
import akka.actor._
import com.typesafe.config.Config
import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import org.apache.gearpump.cluster._
import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._
import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler._
import org.apache.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest}
import org.apache.gearpump.util.{Constants, LogUtil}
/**
* ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster.
* AppMaster can use this class to directly request a live executor actor systems. The communication
* in the background with Master and Worker is hidden from AppMaster.
*
* Please use ExecutorSystemScheduler.props() to construct this actor
*/
private[appmaster]
class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef,
executorSystemLauncher: (Int, Session) => Props) extends Actor {
private val LOG = LogUtil.getLogger(getClass, app = appId)
implicit val timeout = Constants.FUTURE_TIMEOUT
implicit val actorSystem = context.system
var currentSystemId = 0
var resourceAgents = Map.empty[Session, ActorRef]
def receive: Receive = {
clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
}
def clientCommands: Receive = {
case start: StartExecutorSystems =>
LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " +
s"Resources(${start.resources.mkString(",")}))")
val requestor = sender()
val executorSystemConfig = start.executorSystemConfig
val session = Session(requestor, executorSystemConfig)
val agent = resourceAgents.getOrElse(session,
context.actorOf(Props(new ResourceAgent(masterProxy, session))))
resourceAgents = resourceAgents + (session -> agent)
start.resources.foreach { resource =>
agent ! RequestResource(appId, resource)
}
case StopExecutorSystem(executorSystem) =>
executorSystem.shutdown
}
def resourceAllocationMessageHandler: Receive = {
case ResourceAllocatedForSession(allocations, session) =>
if (isSessionAlive(session)) {
allocations.foreach { resourceAllocation =>
val ResourceAllocation(resource, worker, workerId) = resourceAllocation
val launcher = context.actorOf(executorSystemLauncher(appId, session))
launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource)
currentSystemId = currentSystemId + 1
}
}
case ResourceAllocationTimeOut(session) =>
if (isSessionAlive(session)) {
resourceAgents = resourceAgents - session
session.requestor ! StartExecutorSystemTimeout
}
}
def executorSystemMessageHandler: Receive = {
case LaunchExecutorSystemSuccess(system, session) =>
if (isSessionAlive(session)) {
LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor)
system.bindLifeCycleWith(self)
session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar)
} else {
LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " +
"Will shutdown the allocated system")
system.shutdown
}
case LaunchExecutorSystemTimeout(session) =>
if (isSessionAlive(session)) {
LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout")
session.requestor ! StartExecutorSystemTimeout
}
case LaunchExecutorSystemRejected(resource, reason, session) =>
if (isSessionAlive(session)) {
LOG.error(s"Failed to launch executor system, due to $reason, " +
s"will ask master to allocate new resources $resource")
resourceAgents.get(session).map { resourceAgent: ActorRef =>
resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
}
}
}
private def isSessionAlive(session: Session): Boolean = {
Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty
}
}
object ExecutorSystemScheduler {
case class StartExecutorSystems(
resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar])
case class StopExecutorSystem(system: ExecutorSystem)
case object StartExecutorSystemTimeout
case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String],
jar: Option[AppJar], username: String, executorAkkaConfig: Config = null)
/**
* For each client which ask for an executor system, the scheduler will create a session for it.
*
*/
private[appmaster]
case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig)
/**
* This is a agent for session to request resource
*
* @param session the original requester of the resource requests
*/
private[appmaster]
class ResourceAgent(master: ActorRef, session: Session) extends Actor {
private var resourceRequestor: ActorRef = null
var timeOutClock: Cancellable = null
private var unallocatedResource: Int = 0
import context.dispatcher
import org.apache.gearpump.util.Constants._
val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
def receive: Receive = {
case request: RequestResource =>
unallocatedResource += request.request.resource.slots
Option(timeOutClock).map(_.cancel)
timeOutClock = context.system.scheduler.scheduleOnce(
timeout.seconds, self, ResourceAllocationTimeOut(session))
resourceRequestor = sender
master ! request
case ResourceAllocated(allocations) =>
unallocatedResource -= allocations.map(_.resource.slots).sum
resourceRequestor forward ResourceAllocatedForSession(allocations, session)
case timeout: ResourceAllocationTimeOut =>
if (unallocatedResource > 0) {
resourceRequestor ! ResourceAllocationTimeOut(session)
// We will not receive any ResourceAllocation after timeout
context.stop(self)
}
}
}
private[ExecutorSystemScheduler]
case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session)
private[ExecutorSystemScheduler]
case class ResourceAllocationTimeOut(session: Session)
}