blob: 0ae7365f327e5d33e3063e29239ad5b0166b2d99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.master
import akka.actor._
import akka.pattern.ask
import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
import org.apache.gearpump.cluster.AppMasterToWorker._
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
import org.apache.gearpump.cluster.master.AppManager._
import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
import org.apache.gearpump.cluster.master.Master._
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
import org.slf4j.Logger
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
/**
* AppManager is dedicated child of Master to manager all applications.
*/
private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
extends Actor with Stash with TimeOutScheduler {
private val LOG: Logger = LogUtil.getLogger(getClass)
private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
private val appMasterMaxRetries: Int = 5
private val appMasterRetryTimeRange: Duration = 20.seconds
implicit val timeout = FUTURE_TIMEOUT
implicit val executionContext = context.dispatcher
// Next available appId
private var nextAppId: Int = 1
// From appId to appMaster data
// Applications not in activeAppMasters or deadAppMasters are in pending status
private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
// Active appMaster list where applications are in active status
private var activeAppMasters = Set.empty[Int]
// Dead appMaster list where applications are in inactive status
private var deadAppMasters = Set.empty[Int]
private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
def receive: Receive = null
kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
context.become(waitForMasterState)
def waitForMasterState: Receive = {
case GetKVSuccess(_, result) =>
val masterState = result.asInstanceOf[MasterState]
if (masterState != null) {
this.nextAppId = masterState.maxId + 1
this.activeAppMasters = masterState.activeAppMasters
this.deadAppMasters = masterState.deadAppMasters
this.appMasterRegistry = masterState.appMasterRegistry
}
context.become(receiveHandler)
unstashAll()
case GetKVFailed(ex) =>
LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
context.parent ! PoisonPill
case msg =>
LOG.info(s"Get message ${msg.getClass.getSimpleName}")
stash()
}
def receiveHandler: Receive = {
val msg = "Application Manager started. Ready for application submission..."
LOG.info(msg)
clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
appDataStoreService orElse terminationWatch
}
def clientMsgHandler: Receive = {
case SubmitApplication(app, jar, username) =>
LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
val client = sender()
if (applicationNameExist(app.name)) {
client ! SubmitApplicationResult(Failure(
new Exception(s"Application name ${app.name} already existed")))
} else {
context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent,
Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null)
appMasterRestartPolicies += nextAppId ->
new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
nextAppId += 1
}
case RestartApplication(appId) =>
val client = sender()
(kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, result) =>
val appState = result.asInstanceOf[ApplicationState]
if (appState != null) {
LOG.info(s"Shutting down the application (restart), $appId")
self ! ShutdownApplication(appId)
self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
} else {
client ! SubmitApplicationResult(Failure(
new Exception(s"Failed to restart, because the application $appId does not exist.")
))
}
case GetKVFailed(ex) =>
client ! SubmitApplicationResult(Failure(
new Exception(s"Unable to obtain the Master State. " +
s"Application $appId will not be restarted.")
))
}
case ShutdownApplication(appId) =>
LOG.info(s"App Manager Shutting down application $appId")
val (_, appInfo) = appMasterRegistry.get(appId)
.filter { case (_, info) => !deadAppMasters.contains(info.appId)}
.getOrElse((null, null))
Option(appInfo) match {
case Some(info) =>
val worker = info.worker
val workerPath = Option(worker).map(_.path).orNull
LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID")
cleanApplicationData(appId)
val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
s"AppMaster $appId shutdown requested by master...")
sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
sender ! ShutdownApplicationResult(Success(appId))
case None =>
val errorMsg = s"Failed to find registration information for appId: $appId"
LOG.error(errorMsg)
sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
}
case ResolveAppId(appId) =>
val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
if (null != appMaster) {
sender ! ResolveAppIdResult(Success(appMaster))
} else {
sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
}
case AppMastersDataRequest =>
var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
appMasterRegistry.foreach(pair => {
val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
val workerPath = Option(info.worker).map(worker =>
ActorUtil.getFullPath(context.system, worker.path))
val status = getAppMasterStatus(id)
appMastersData += AppMasterData(
status, id, info.appName, appMasterPath, workerPath.orNull,
info.submissionTime, info.startTime, info.finishTime, info.user)
})
sender ! AppMastersData(appMastersData.toList)
case QueryAppMasterConfig(appId) =>
val config =
if (appMasterRegistry.contains(appId)) {
val (_, info) = appMasterRegistry(appId)
info.config
} else {
null
}
sender ! AppMasterConfig(config)
case appMasterDataRequest: AppMasterDataRequest =>
val appId = appMasterDataRequest.appId
val appStatus = getAppMasterStatus(appId)
appStatus match {
case AppMasterNonExist =>
sender ! AppMasterData(AppMasterNonExist)
case _ =>
val (appMaster, info) = appMasterRegistry(appId)
val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
val workerPath = Option(info.worker).map(
worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
sender ! AppMasterData(
appStatus, appId, info.appName, appMasterPath, workerPath,
info.submissionTime, info.startTime, info.finishTime, info.user)
}
}
def workerMessage: Receive = {
case ShutdownExecutorSucceed(appId, executorId) =>
LOG.info(s"Shut down executor $executorId for application $appId successfully")
case failed: ShutdownExecutorFailed =>
LOG.error(failed.reason)
}
private def getAppMasterStatus(appId: Int): AppMasterStatus = {
if (activeAppMasters.contains(appId)) {
AppMasterActive
} else if (deadAppMasters.contains(appId)) {
AppMasterInActive
} else if (appMasterRegistry.contains(appId)) {
AppMasterPending
} else {
AppMasterNonExist
}
}
private def shutDownExecutorTimeOut(): Unit = {
LOG.error(s"Shut down executor time out")
}
def appMasterMessage: Receive = {
case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
val startTime = System.currentTimeMillis()
val register = registerBack.copy(startTime = startTime)
LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
context.watch(appMaster)
appMasterRegistry += register.appId -> (appMaster, register)
kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
sender ! AppMasterRegistered(register.appId)
case ActivateAppMaster(appId) =>
LOG.info(s"Activate AppMaster for app $appId")
activeAppMasters += appId
kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
sender ! AppMasterActivated(appId)
}
def appDataStoreService: Receive = {
case SaveAppData(appId, key, value) =>
val client = sender()
(kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
case PutKVSuccess =>
client ! AppDataSaved
case PutKVFailed(k, ex) =>
client ! SaveAppDataFailed
}
case GetAppData(appId, key) =>
val client = sender()
(kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(privateKey, value) =>
client ! GetAppDataResult(key, value)
case GetKVFailed(ex) =>
client ! GetAppDataResult(key, null)
}
}
def terminationWatch: Receive = {
case terminate: Terminated =>
LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
s"network down: ${terminate.getAddressTerminated}")
// Now we assume that the only normal way to stop the application is submitting a
// ShutdownApplication request
val application = appMasterRegistry.find { appInfo =>
val (_, (actorRef, _)) = appInfo
actorRef.compareTo(terminate.actor) == 0
}
if (application.nonEmpty) {
val appId = application.get._1
(kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, result) =>
val appState = result.asInstanceOf[ApplicationState]
if (appState != null) {
LOG.info(s"Recovering application, $appId")
self ! RecoverApplication(appState)
} else {
LOG.error(s"Cannot find application state for $appId")
}
case GetKVFailed(ex) =>
LOG.error(s"Cannot find master state to recover")
}
}
}
def selfMsgHandler: Receive = {
case RecoverApplication(state) =>
val appId = state.appId
if (appMasterRestartPolicies.get(appId).get.allowRestart) {
LOG.info(s"AppManager Recovering Application $appId...")
activeAppMasters -= appId
kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username,
context.parent, None), s"launcher${appId}_${Util.randInt()}")
} else {
LOG.error(s"Application $appId failed too many times")
}
}
case class RecoverApplication(applicationStatus: ApplicationState)
private def cleanApplicationData(appId: Int): Unit = {
if (appMasterRegistry.contains(appId)) {
// Add the dead app to dead appMasters
deadAppMasters += appId
// Remove the dead app from active appMasters
activeAppMasters -= appId
appMasterRegistry += appId -> {
val (ref, info) = appMasterRegistry(appId)
(ref, info.copy(finishTime = System.currentTimeMillis()))
}
kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
kvService ! DeleteKVGroup(appId.toString)
}
}
private def applicationNameExist(appName: String): Boolean = {
appMasterRegistry.values.exists { case (_, info) =>
info.appName == appName && !deadAppMasters.contains(info.appId)
}
}
}
object AppManager {
final val APP_STATE = "app_state"
// The id is used in KVStore
final val MASTER_STATE = "master_state"
case class MasterState(
maxId: Int,
appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
activeAppMasters: Set[Int],
deadAppMasters: Set[Int])
}