package org.apache.gearpump.cluster.master
import akka.pattern.ask
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
import org.apache.gearpump.cluster.AppMasterToWorker._
import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus}
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
import org.apache.gearpump.cluster.appmaster.{ApplicationMetaData, ApplicationRuntimeInfo}
import org.apache.gearpump.cluster.master.AppManager._
import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
import org.apache.gearpump.cluster.master.Master._
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
import org.slf4j.Logger
import scala.concurrent.Future
import scala.util.{Failure, Success}
* AppManager is dedicated child of Master to manager all applications.
private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
extends Actor with Stash with TimeOutScheduler {
private val LOG: Logger = LogUtil.getLogger(getClass)
private val systemConfig: Config = context.system.settings.config
private val appTotalRetries: Int = systemConfig.getInt(Constants.APPLICATION_TOTAL_RETRIES)
implicit val timeout = FUTURE_TIMEOUT
implicit val executionContext = context.dispatcher
// Next available appId
private var nextAppId: Int = 1
private var applicationRegistry = Map.empty[Int, ApplicationRuntimeInfo]
private var appResultListeners = Map.empty[Int, List[ActorRef]]
private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
def receive: Receive = null
def waitForMasterState: Receive = {
case GetKVSuccess(_, result) =>
val masterState = result.asInstanceOf[MasterState]
if (masterState != null) {
this.nextAppId = masterState.maxId + 1
this.applicationRegistry = masterState.applicationRegistry
case GetKVFailed(ex) =>
LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
context.parent ! PoisonPill
case msg =>"Get message ${msg.getClass.getSimpleName}")
def receiveHandler: Receive = {
val msg = "Application Manager started. Ready for application submission..."
clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
appDataStoreService orElse terminationWatch
def clientMsgHandler: Receive = {
case SubmitApplication(app, jar, username) =>"Submit Application ${}($nextAppId) by $username...")
val client = sender()
if (applicationNameExist( {
client ! SubmitApplicationResult(Failure(
new Exception(s"Application name ${} already existed")))
} else {
context.actorOf(launcher.props(nextAppId, APPMASTER_DEFAULT_EXECUTOR_ID, app, jar, username,
context.parent, Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
appMasterRestartPolicies += nextAppId -> new RestartPolicy(appTotalRetries)
val appRuntimeInfo = ApplicationRuntimeInfo(nextAppId,,
user = username,
submissionTime = System.currentTimeMillis(),
config = app.clusterConfig,
status = ApplicationStatus.PENDING)
applicationRegistry += nextAppId -> appRuntimeInfo
val appMetaData = ApplicationMetaData(nextAppId, 0, app, jar, username)
kvService ! PutKV(nextAppId.toString, APP_METADATA, appMetaData)
nextAppId += 1
kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
case RestartApplication(appId) =>
val client = sender()
(kvService ? GetKV(appId.toString, APP_METADATA)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, result) =>
val metaData = result.asInstanceOf[ApplicationMetaData]
if (metaData != null) {"Shutting down the application (restart), $appId")
self ! ShutdownApplication(appId)
self.tell(SubmitApplication(metaData.appDesc, metaData.jar, metaData.username), client)
} else {
client ! SubmitApplicationResult(Failure(
new Exception(s"Failed to restart, because the application $appId does not exist.")
case GetKVFailed(ex) =>
client ! SubmitApplicationResult(Failure(
new Exception(s"Unable to obtain the Master State. " +
s"Application $appId will not be restarted.")
case ShutdownApplication(appId) =>"App Manager Shutting down application $appId")
val appInfo = applicationRegistry.get(appId).
appInfo match {
case Some(info) =>
killAppMaster(appId, info.worker)
sender ! ShutdownApplicationResult(Success(appId))
// Here we use the function to make sure the status is consistent because
// sending another message to self will involve timing problem
this.onApplicationStatusChanged(appId, ApplicationStatus.TERMINATED,
System.currentTimeMillis(), null)
case None =>
val errorMsg = s"Failed to find registration information for appId: $appId"
sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
case ResolveAppId(appId) =>
val appMaster = applicationRegistry.get(appId).map(_.appMaster)
appMaster match {
case Some(appMasterActor) =>
sender ! ResolveAppIdResult(Success(appMasterActor))
case None =>
sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
case AppMastersDataRequest =>
var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
applicationRegistry.foreach(pair => {
val (id, info: ApplicationRuntimeInfo) = pair
val appMasterPath = ActorUtil.getFullPath(context.system, info.appMaster)
val workerPath = Option(info.worker).map(worker =>
ActorUtil.getFullPath(context.system, worker))
appMastersData += AppMasterData(
info.status, id, info.appName, appMasterPath, workerPath.orNull,
info.submissionTime, info.startTime, info.finishTime, info.user)
sender ! AppMastersData(appMastersData.toList)
case QueryAppMasterConfig(appId) =>
val config = applicationRegistry.get(appId).map(_.config).getOrElse(ConfigFactory.empty())
sender ! AppMasterConfig(config)
case appMasterDataRequest: AppMasterDataRequest =>
val appId = appMasterDataRequest.appId
val appRuntimeInfo = applicationRegistry.get(appId)
appRuntimeInfo match {
case Some(info) =>
val appMasterPath = ActorUtil.getFullPath(context.system, info.appMaster.path)
val workerPath = Option(info.worker).map(
worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
sender ! AppMasterData(
info.status, appId, info.appName, appMasterPath, workerPath,
info.submissionTime, info.startTime, info.finishTime, info.user)
case None =>
sender ! AppMasterData(ApplicationStatus.NONEXIST)
case RegisterAppResultListener(appId) =>
val listenerList = appResultListeners.getOrElse(appId, List.empty[ActorRef])
appResultListeners += appId -> (listenerList :+ sender())
def workerMessage: Receive = {
case ShutdownExecutorSucceed(appId, executorId) =>"Shut down executor $executorId for application $appId successfully")
case failed: ShutdownExecutorFailed =>
def appMasterMessage: Receive = {
case RegisterAppMaster(appId, appMaster, workerInfo) =>
val appInfo = applicationRegistry.get(appId)
appInfo match {
case Some(info) =>"Register AppMaster for app: $appId")
val updatedInfo = info.onAppMasterRegistered(appMaster, workerInfo.ref)
applicationRegistry += appId -> updatedInfo
kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
sender ! AppMasterRegistered(appId)
case None =>
LOG.error(s"Can not find submitted application $appId")
case ApplicationStatusChanged(appId, newStatus, timeStamp, error) =>
onApplicationStatusChanged(appId, newStatus, timeStamp, error)
private def onApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
timeStamp: MilliSeconds, error: Throwable): Unit = {
applicationRegistry.get(appId) match {
case Some(appRuntimeInfo) =>
if (appRuntimeInfo.status.canTransitTo(newStatus)) {
var updatedStatus: ApplicationRuntimeInfo = null"Application $appId change to ${newStatus.toString} at $timeStamp")
newStatus match {
case ApplicationStatus.ACTIVE =>
updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
sender ! AppMasterActivated(appId)
case succeeded@ApplicationStatus.SUCCEEDED =>
killAppMaster(appId, appRuntimeInfo.worker)
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
appResultListeners.getOrElse(appId, List.empty).foreach { client =>
client ! ApplicationSucceeded(appId)
case failed@ApplicationStatus.FAILED =>
killAppMaster(appId, appRuntimeInfo.worker)
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
appResultListeners.getOrElse(appId, List.empty).foreach { client =>
client ! ApplicationFailed(appId, error)
case terminated@ApplicationStatus.TERMINATED =>
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
case status =>
LOG.error(s"App $appId should not change it's status to $status")
if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
kvService ! DeleteKVGroup(appId.toString)
applicationRegistry += appId -> updatedStatus
kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
} else {
LOG.error(s"Application $appId tries to switch status ${appRuntimeInfo.status} " +
s"to $newStatus")
case None =>
LOG.error(s"Can not find application runtime info for appId $appId when it's " +
s"status changed to ${newStatus.toString}")
def appDataStoreService: Receive = {
case SaveAppData(appId, key, value) =>
val client = sender()
(kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
case PutKVSuccess =>
client ! AppDataSaved
case PutKVFailed(k, ex) =>
client ! SaveAppDataFailed
case GetAppData(appId, key) =>
val client = sender()
(kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, value) =>
client ! GetAppDataResult(key, value)
case GetKVFailed(ex) =>
client ! GetAppDataResult(key, null)
def terminationWatch: Receive = {
case terminate: Terminated =>"AppMaster(${}) is terminated, " +
s"network down: ${terminate.getAddressTerminated}")
// Now we assume that the only normal way to stop the application is submitting a
// ShutdownApplication request
val application = applicationRegistry.find { appInfo =>
val (_, runtimeInfo) = appInfo &&
if (application.nonEmpty) {
val appId = application.get._1
(kvService ? GetKV(appId.toString, APP_METADATA)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, result) =>
val appMetadata = result.asInstanceOf[ApplicationMetaData]
if (appMetadata != null) {"Recovering application, $appId")
val updatedInfo = application.get._2.copy(status = ApplicationStatus.PENDING)
applicationRegistry += appId -> updatedInfo
self ! RecoverApplication(appMetadata)
} else {
LOG.error(s"Cannot find application meta data for $appId")
case GetKVFailed(ex) =>
LOG.error(s"Cannot find master state to recover")
def selfMsgHandler: Receive = {
case RecoverApplication(state) =>
val appId = state.appId
if (appMasterRestartPolicies.get(appId).get.allowRestart) {"AppManager Recovering Application $appId...")
MasterState(this.nextAppId, applicationRegistry))
context.actorOf(launcher.props(appId, APPMASTER_DEFAULT_EXECUTOR_ID, state.appDesc,
state.jar, state.username, context.parent, None), s"launcher${appId}_${Util.randInt()}")
} else {
LOG.error(s"Application $appId failed too many times")
private def killAppMaster(appId: Int, worker: ActorRef): Unit = {
val workerPath = Option(worker).map(_.path).orNull"Shutdown AppMaster at $workerPath, appId: $appId, executorId:" +
val shutdown = ShutdownExecutor(appId, APPMASTER_DEFAULT_EXECUTOR_ID,
s"AppMaster $appId shutdown requested by master...")
sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
private def applicationNameExist(appName: String): Boolean = {
applicationRegistry.values.exists { info =>
info.appName == appName && !info.status.isInstanceOf[ApplicationTerminalStatus]
private def shutDownExecutorTimeOut(): Unit = {
LOG.error(s"Shut down executor time out")
object AppManager {
final val APP_METADATA = "app_metadata"
// The id is used in KVStore
final val MASTER_STATE = "master_state"
case class RecoverApplication(appMetaData: ApplicationMetaData)
case class MasterState(maxId: Int, applicationRegistry: Map[Int, ApplicationRuntimeInfo])