blob: 946a4ae41c97cca8b6eba2f2f584516bfdac943a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.appmaster
import akka.actor._
import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems}
import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
import org.apache.gearpump.cluster.master.MasterProxy
import org.apache.gearpump.cluster.{AppDescription, AppMasterContext}
import org.apache.gearpump.util.LogUtil
import scala.concurrent.duration._
/**
* This serves as runtime environment for AppMaster.
* When starting an AppMaster, we need to setup the connection to master,
* and prepare other environments.
*
* This also extend the function of Master, by providing a scheduler service for Executor System.
* AppMaster can ask Master for executor system directly. details like requesting resource,
* contacting worker to start a process, and then starting an executor system is hidden from
* AppMaster.
*
* Please use AppMasterRuntimeEnvironment.props() to construct this actor.
*/
private[appmaster]
class AppMasterRuntimeEnvironment(
appContextInput: AppMasterContext,
app: AppDescription,
masters: Iterable[ActorPath],
masterFactory: (AppId, MasterActorRef) => Props,
appMasterFactory: (AppMasterContext, AppDescription) => Props,
masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props)
extends Actor {
private val appId = appContextInput.appId
private val LOG = LogUtil.getLogger(getClass, app = appId)
private val master = context.actorOf(
masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds)))))
private val appContext = appContextInput.copy(masterProxy = master)
// Create appMaster proxy to receive command and forward to appmaster
private val appMaster = context.actorOf(appMasterFactory(appContext, app))
context.watch(appMaster)
private val registerAppMaster = RegisterAppMaster(appId, appMaster, appContext.workerInfo)
private val masterConnectionKeeper = context.actorOf(
masterConnectionKeeperFactory(master, registerAppMaster, self))
context.watch(masterConnectionKeeper)
def receive: Receive = {
case MasterConnected =>
LOG.info(s"Master is connected, start AppMaster $appId...")
appMaster ! StartAppMaster
case MasterStopped =>
LOG.error(s"Master is stopped, stop AppMaster $appId...")
context.stop(self)
case Terminated(actor) => actor match {
case `appMaster` =>
LOG.error(s"AppMaster $appId is stopped, shutdown myself")
context.stop(self)
case `masterConnectionKeeper` =>
LOG.error(s"Master connection keeper is stopped, appId: $appId, shutdown myself")
context.stop(self)
case _ => // Skip
}
}
}
object AppMasterRuntimeEnvironment {
def props(masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext
): Props = {
val master = (appId: AppId, masterProxy: MasterActorRef) =>
MasterWithExecutorSystemProvider.props(appId, masterProxy)
val appMaster = (appContext: AppMasterContext, app: AppDescription) =>
LazyStartAppMaster.props(appContext, app)
val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster:
RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper(
registerAppMaster, master, masterStatusListener = listener))
Props(new AppMasterRuntimeEnvironment(appContextInput, app, masters,
master, appMaster, masterConnectionKeeper))
}
/**
* This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy
* way. When real AppMaster is not started yet, all messages are stashed. The stashed
* messages are forwarded to real AppMaster when the real AppMaster is started.
*
* Please use LazyStartAppMaster.props to construct this actor
*
* @param appMasterProps underlying AppMaster Props
*/
private[appmaster]
class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash {
private val LOG = LogUtil.getLogger(getClass, app = appId)
def receive: Receive = null
context.become(startAppMaster)
def startAppMaster: Receive = {
case StartAppMaster =>
val appMaster = context.actorOf(appMasterProps, "appmaster")
context.watch(appMaster)
context.become(terminationWatch(appMaster) orElse appMasterService(appMaster))
unstashAll()
case _ =>
stash()
}
def terminationWatch(appMaster: ActorRef): Receive = {
case Terminated(appMaster) =>
LOG.error("appmaster is stopped")
context.stop(self)
}
def appMasterService(appMaster: ActorRef): Receive = {
case msg => appMaster forward msg
}
}
private[appmaster]
object LazyStartAppMaster {
def props(appContext: AppMasterContext, app: AppDescription): Props = {
val appMasterProps = Props(Class.forName(app.appMaster), appContext, app)
Props(new LazyStartAppMaster(appContext.appId, appMasterProps))
}
}
private[appmaster] case object StartAppMaster
/**
* This enhance Master by providing new service: StartExecutorSystems
*
* Please use MasterWithExecutorSystemProvider.props to construct this actor
*
*/
private[appmaster]
class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props)
extends Actor {
val executorSystemProvider: ActorRef = context.actorOf(executorSystemProviderProps)
override def receive: Receive = {
case request: StartExecutorSystems =>
executorSystemProvider forward request
case msg =>
master forward msg
}
}
private[appmaster]
object MasterWithExecutorSystemProvider {
def props(appId: Int, master: ActorRef): Props = {
val executorSystemLauncher = (appId: Int, session: Session) =>
Props(new ExecutorSystemLauncher(appId, session))
val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher))
Props(new MasterWithExecutorSystemProvider(master, scheduler))
}
}
private[appmaster] type AppId = Int
private[appmaster] type MasterActorRef = ActorRef
private[appmaster] type ListenerActorRef = ActorRef
}