blob: 0a32d9dc9fed6699d3586b52ebb7b1ec5a9c5219 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.master
import akka.actor._
import akka.pattern.ask
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
import org.apache.gearpump.cluster.AppMasterToWorker._
import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus}
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
import org.apache.gearpump.cluster.appmaster.{ApplicationMetaData, ApplicationRuntimeInfo}
import org.apache.gearpump.cluster.master.AppManager._
import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
import org.apache.gearpump.cluster.master.Master._
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
import org.slf4j.Logger
import scala.concurrent.Future
import scala.util.{Failure, Success}
/**
* AppManager is dedicated child of Master to manager all applications.
*/
private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
extends Actor with Stash with TimeOutScheduler {
private val LOG: Logger = LogUtil.getLogger(getClass)
private val systemConfig: Config = context.system.settings.config
private val appTotalRetries: Int = systemConfig.getInt(Constants.APPLICATION_TOTAL_RETRIES)
implicit val timeout = FUTURE_TIMEOUT
implicit val executionContext = context.dispatcher
// Next available appId
private var nextAppId: Int = 1
private var applicationRegistry = Map.empty[Int, ApplicationRuntimeInfo]
private var appResultListeners = Map.empty[Int, List[ActorRef]]
private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
def receive: Receive = null
kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
context.become(waitForMasterState)
def waitForMasterState: Receive = {
case GetKVSuccess(_, result) =>
val masterState = result.asInstanceOf[MasterState]
if (masterState != null) {
this.nextAppId = masterState.maxId + 1
this.applicationRegistry = masterState.applicationRegistry
}
context.become(receiveHandler)
unstashAll()
case GetKVFailed(ex) =>
LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
context.parent ! PoisonPill
case msg =>
LOG.info(s"Get message ${msg.getClass.getSimpleName}")
stash()
}
def receiveHandler: Receive = {
val msg = "Application Manager started. Ready for application submission..."
LOG.info(msg)
clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
appDataStoreService orElse terminationWatch
}
def clientMsgHandler: Receive = {
case SubmitApplication(app, jar, username) =>
LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
val client = sender()
if (applicationNameExist(app.name)) {
client ! SubmitApplicationResult(Failure(
new Exception(s"Application name ${app.name} already existed")))
} else {
context.actorOf(launcher.props(nextAppId, APPMASTER_DEFAULT_EXECUTOR_ID, app, jar, username,
context.parent, Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
appMasterRestartPolicies += nextAppId -> new RestartPolicy(appTotalRetries)
val appRuntimeInfo = ApplicationRuntimeInfo(nextAppId, app.name,
user = username,
submissionTime = System.currentTimeMillis(),
config = app.clusterConfig,
status = ApplicationStatus.PENDING)
applicationRegistry += nextAppId -> appRuntimeInfo
val appMetaData = ApplicationMetaData(nextAppId, 0, app, jar, username)
kvService ! PutKV(nextAppId.toString, APP_METADATA, appMetaData)
nextAppId += 1
kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
}
case RestartApplication(appId) =>
val client = sender()
(kvService ? GetKV(appId.toString, APP_METADATA)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, result) =>
val metaData = result.asInstanceOf[ApplicationMetaData]
if (metaData != null) {
LOG.info(s"Shutting down the application (restart), $appId")
self ! ShutdownApplication(appId)
self.tell(SubmitApplication(metaData.appDesc, metaData.jar, metaData.username), client)
} else {
client ! SubmitApplicationResult(Failure(
new Exception(s"Failed to restart, because the application $appId does not exist.")
))
}
case GetKVFailed(ex) =>
client ! SubmitApplicationResult(Failure(
new Exception(s"Unable to obtain the Master State. " +
s"Application $appId will not be restarted.")
))
}
case ShutdownApplication(appId) =>
LOG.info(s"App Manager Shutting down application $appId")
val appInfo = applicationRegistry.get(appId).
filter(!_.status.isInstanceOf[ApplicationTerminalStatus])
appInfo match {
case Some(info) =>
killAppMaster(appId, info.worker)
sender ! ShutdownApplicationResult(Success(appId))
// Here we use the function to make sure the status is consistent because
// sending another message to self will involve timing problem
this.onApplicationStatusChanged(appId, ApplicationStatus.TERMINATED,
System.currentTimeMillis(), null)
case None =>
val errorMsg = s"Failed to find registration information for appId: $appId"
LOG.error(errorMsg)
sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
}
case ResolveAppId(appId) =>
val appMaster = applicationRegistry.get(appId).map(_.appMaster)
appMaster match {
case Some(appMasterActor) =>
sender ! ResolveAppIdResult(Success(appMasterActor))
case None =>
sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
}
case AppMastersDataRequest =>
var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
applicationRegistry.foreach(pair => {
val (id, info: ApplicationRuntimeInfo) = pair
val appMasterPath = ActorUtil.getFullPath(context.system, info.appMaster)
val workerPath = Option(info.worker).map(worker =>
ActorUtil.getFullPath(context.system, worker))
appMastersData += AppMasterData(
info.status, id, info.appName, appMasterPath, workerPath.orNull,
info.submissionTime, info.startTime, info.finishTime, info.user)
})
sender ! AppMastersData(appMastersData.toList)
case QueryAppMasterConfig(appId) =>
val config = applicationRegistry.get(appId).map(_.config).getOrElse(ConfigFactory.empty())
sender ! AppMasterConfig(config)
case appMasterDataRequest: AppMasterDataRequest =>
val appId = appMasterDataRequest.appId
val appRuntimeInfo = applicationRegistry.get(appId)
appRuntimeInfo match {
case Some(info) =>
val appMasterPath = ActorUtil.getFullPath(context.system, info.appMaster.path)
val workerPath = Option(info.worker).map(
worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
sender ! AppMasterData(
info.status, appId, info.appName, appMasterPath, workerPath,
info.submissionTime, info.startTime, info.finishTime, info.user)
case None =>
sender ! AppMasterData(ApplicationStatus.NONEXIST)
}
case RegisterAppResultListener(appId) =>
val listenerList = appResultListeners.getOrElse(appId, List.empty[ActorRef])
appResultListeners += appId -> (listenerList :+ sender())
}
def workerMessage: Receive = {
case ShutdownExecutorSucceed(appId, executorId) =>
LOG.info(s"Shut down executor $executorId for application $appId successfully")
case failed: ShutdownExecutorFailed =>
LOG.error(failed.reason)
}
def appMasterMessage: Receive = {
case RegisterAppMaster(appId, appMaster, workerInfo) =>
val appInfo = applicationRegistry.get(appId)
appInfo match {
case Some(info) =>
LOG.info(s"Register AppMaster for app: $appId")
val updatedInfo = info.onAppMasterRegistered(appMaster, workerInfo.ref)
context.watch(appMaster)
applicationRegistry += appId -> updatedInfo
kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
sender ! AppMasterRegistered(appId)
case None =>
LOG.error(s"Can not find submitted application $appId")
}
case ApplicationStatusChanged(appId, newStatus, timeStamp, error) =>
onApplicationStatusChanged(appId, newStatus, timeStamp, error)
}
private def onApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
timeStamp: MilliSeconds, error: Throwable): Unit = {
applicationRegistry.get(appId) match {
case Some(appRuntimeInfo) =>
if (appRuntimeInfo.status.canTransitTo(newStatus)) {
var updatedStatus: ApplicationRuntimeInfo = null
LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp")
newStatus match {
case ApplicationStatus.ACTIVE =>
updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
sender ! AppMasterActivated(appId)
case succeeded@ApplicationStatus.SUCCEEDED =>
killAppMaster(appId, appRuntimeInfo.worker)
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
sendAppResultToListeners(appId, ApplicationSucceeded(appId))
case failed@ApplicationStatus.FAILED =>
killAppMaster(appId, appRuntimeInfo.worker)
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
sendAppResultToListeners(appId, ApplicationFailed(appId, error))
case terminated@ApplicationStatus.TERMINATED =>
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
sendAppResultToListeners(appId, ApplicationTerminated(appId))
case status =>
LOG.error(s"App $appId should not change it's status to $status")
}
if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
kvService ! DeleteKVGroup(appId.toString)
}
applicationRegistry += appId -> updatedStatus
kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
} else {
LOG.error(s"Application $appId tries to switch status ${appRuntimeInfo.status} " +
s"to $newStatus")
}
case None =>
LOG.error(s"Can not find application runtime info for appId $appId when it's " +
s"status changed to ${newStatus.toString}")
}
}
private def sendAppResultToListeners(appId: Int, result: ApplicationResult): Unit = {
appResultListeners.get(appId).foreach {
_.foreach { client =>
client ! result
}
}
}
def appDataStoreService: Receive = {
case SaveAppData(appId, key, value) =>
val client = sender()
(kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
case PutKVSuccess =>
client ! AppDataSaved
case PutKVFailed(k, ex) =>
client ! SaveAppDataFailed
}
case GetAppData(appId, key) =>
val client = sender()
(kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, value) =>
client ! GetAppDataResult(key, value)
case GetKVFailed(ex) =>
client ! GetAppDataResult(key, null)
}
}
def terminationWatch: Receive = {
case terminate: Terminated =>
LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
s"network down: ${terminate.getAddressTerminated}")
// Now we assume that the only normal way to stop the application is submitting a
// ShutdownApplication request
val application = applicationRegistry.find { appInfo =>
val (_, runtimeInfo) = appInfo
terminate.actor.equals(runtimeInfo.appMaster) &&
!runtimeInfo.status.isInstanceOf[ApplicationTerminalStatus]
}
if (application.nonEmpty) {
val appId = application.get._1
(kvService ? GetKV(appId.toString, APP_METADATA)).asInstanceOf[Future[GetKVResult]].map {
case GetKVSuccess(_, result) =>
val appMetadata = result.asInstanceOf[ApplicationMetaData]
if (appMetadata != null) {
LOG.info(s"Recovering application, $appId")
val updatedInfo = application.get._2.copy(status = ApplicationStatus.PENDING)
applicationRegistry += appId -> updatedInfo
self ! RecoverApplication(appMetadata)
} else {
LOG.error(s"Cannot find application meta data for $appId")
}
case GetKVFailed(ex) =>
LOG.error(s"Cannot find master state to recover")
}
}
}
def selfMsgHandler: Receive = {
case RecoverApplication(state) =>
val appId = state.appId
if (appMasterRestartPolicies.get(appId).get.allowRestart) {
LOG.info(s"AppManager Recovering Application $appId...")
kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
MasterState(this.nextAppId, applicationRegistry))
context.actorOf(launcher.props(appId, APPMASTER_DEFAULT_EXECUTOR_ID, state.appDesc,
state.jar, state.username, context.parent, None), s"launcher${appId}_${Util.randInt()}")
} else {
LOG.error(s"Application $appId failed too many times")
}
}
private def killAppMaster(appId: Int, worker: ActorRef): Unit = {
val workerPath = Option(worker).map(_.path).orNull
LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId:" +
s" $APPMASTER_DEFAULT_EXECUTOR_ID")
val shutdown = ShutdownExecutor(appId, APPMASTER_DEFAULT_EXECUTOR_ID,
s"AppMaster $appId shutdown requested by master...")
sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
}
private def applicationNameExist(appName: String): Boolean = {
applicationRegistry.values.exists { info =>
info.appName == appName && !info.status.isInstanceOf[ApplicationTerminalStatus]
}
}
private def shutDownExecutorTimeOut(): Unit = {
LOG.error(s"Shut down executor time out")
}
}
object AppManager {
final val APP_METADATA = "app_metadata"
// The id is used in KVStore
final val MASTER_STATE = "master_state"
case class RecoverApplication(appMetaData: ApplicationMetaData)
case class MasterState(maxId: Int, applicationRegistry: Map[Int, ApplicationRuntimeInfo])
}