blob: d791a10ce27deea21abf22d1c14bc12c1b529f3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.master
import java.util.concurrent.{TimeUnit, TimeoutException}
import org.apache.gearpump.cluster.worker.WorkerId
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success}
import akka.actor.{Actor, ActorRef, Props, _}
import com.typesafe.config.Config
import org.slf4j.Logger
import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, WorkerInfo}
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
import org.apache.gearpump.cluster.{AppDescription, AppJar, _}
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util.ActorSystemBooter._
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
/**
* AppMasterLauncher is a child Actor of AppManager, it is responsible
* to launch the AppMaster on the cluster.
*/
class AppMasterLauncher(
appId: Int, executorId: Int, app: AppDescription,
jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef])
extends Actor {
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
private val scheduler = context.system.scheduler
private val systemConfig = context.system.settings.config
private val TIMEOUT = Duration(15, TimeUnit.SECONDS)
private val appMasterAkkaConfig: Config = app.clusterConfig
LOG.info(s"Ask Master resource to start AppMaster $appId...")
master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
def receive: Receive = waitForResourceAllocation
def waitForResourceAllocation: Receive = {
case ResourceAllocated(allocations) =>
val ResourceAllocation(resource, worker, workerId) = allocations(0)
LOG.info(s"Resource allocated for appMaster $appId on worker $workerId(${worker.path})")
val workerInfo = WorkerInfo(workerId, worker)
val appMasterContext = AppMasterContext(appId, username, resource, workerInfo, jar, null)
LOG.info(s"Try to launch a executor for AppMaster on worker $workerId for app $appId")
val name = ActorUtil.actorNameForExecutor(appId, executorId)
val selfPath = ActorUtil.getFullPath(context.system, self.path)
val jvmSetting =
Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs,
classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
username, appMasterAkkaConfig)
worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
context.become(waitForActorSystemToStart(worker, appMasterContext, resource))
}
def waitForActorSystemToStart(worker: ActorRef, appContext: AppMasterContext,
resource: Resource): Receive = {
case ExecutorLaunchRejected(reason, ex) =>
LOG.error(s"Executor Launch failed reason: $reason", ex)
LOG.info(s"reallocate resource $resource to start appmaster")
master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
context.become(waitForResourceAllocation)
case RegisterActorSystem(systemPath) =>
LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster")
sender ! ActorSystemRegistered(worker)
val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
.asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
sender ! CreateActor(AppMasterRuntimeEnvironment.props(masterAddress, app, appContext),
s"appdaemon$appId")
import context.dispatcher
val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
CreateActorFailed(app.appMaster, new TimeoutException))
context.become(waitForAppMasterToStart(worker, appMasterTimeout))
}
def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = {
case ActorCreated(appMaster, _) =>
cancel.cancel()
sender ! BindLifeCycle(appMaster)
LOG.info(s"AppMaster is created, mission complete...")
replyToClient(SubmitApplicationResult(Success(appId)))
context.stop(self)
case CreateActorFailed(_, reason) =>
cancel.cancel()
worker ! ShutdownExecutor(appId, executorId, reason.getMessage)
replyToClient(SubmitApplicationResult(Failure(reason)))
context.stop(self)
}
def replyToClient(result: SubmitApplicationResult): Unit = {
client.foreach(_.tell(result, master))
}
}
object AppMasterLauncher extends AppMasterLauncherFactory {
def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
username: String, master: ActorRef, client: Option[ActorRef]): Props = {
Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client))
}
}
trait AppMasterLauncherFactory {
def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
username: String, master: ActorRef, client: Option[ActorRef]): Props
}