blob: 6b4df07cd566038173f639b40eca0558d10b60bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.master
import java.lang.management.ManagementFactory
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.jarstore.JarStoreServer
import scala.collection.JavaConverters._
import scala.collection.immutable
import akka.actor._
import akka.remote.DisassociatedEvent
import com.typesafe.config.Config
import org.apache.commons.lang.exception.ExceptionUtils
import org.slf4j.Logger
import org.apache.gearpump.cluster.AppMasterToMaster._
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.MasterToAppMaster._
import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
import org.apache.gearpump.cluster.MasterToWorker._
import org.apache.gearpump.cluster.WorkerToMaster._
import org.apache.gearpump.cluster.master.InMemoryKVService._
import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
import org.apache.gearpump.metrics.Metrics.ReportMetrics
import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
import org.apache.gearpump.util._
/**
* Master Actor who manages resources of the whole cluster.
* It is like the resource manager of YARN.
*/
private[cluster] class Master extends Actor with Stash {
private val LOG: Logger = LogUtil.getLogger(getClass)
private val systemConfig: Config = context.system.settings.config
private implicit val timeout = Constants.FUTURE_TIMEOUT
private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
// Resources and resourceRequests can be dynamically constructed by
// heartbeat of worker and appmaster when master singleton is migrated.
// We don't need to persist them in cluster
private var appManager: ActorRef = null
private var scheduler: ActorRef = null
private var workers = new immutable.HashMap[ActorRef, WorkerId]
private val birth = System.currentTimeMillis()
private var nextWorkerId = 0
def receive: Receive = null
// Register jvm metrics
Metrics(context.system).register(new JvmMetricsSet(s"master"))
LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
// Maintain the list of active masters.
private var masters: List[MasterNode] = {
// Add myself into the list of initial masters.
List(MasterNode(hostPort.host, hostPort.port))
}
val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
val historyMetricsService = if (metricsEnabled) {
val historyMetricsService = {
context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
}
val metricsReportService = context.actorOf(
Props(new MetricsReporterService(Metrics(context.system))))
historyMetricsService.tell(ReportMetrics, metricsReportService)
Some(historyMetricsService)
} else {
None
}
kvService ! GetKV(MASTER_GROUP, WORKER_ID)
context.become(waitForNextWorkerId)
def waitForNextWorkerId: Receive = {
case GetKVSuccess(_, result) =>
if (result != null) {
this.nextWorkerId = result.asInstanceOf[Int]
} else {
LOG.warn("Cannot find existing state in the distributed cluster...")
}
context.become(receiveHandler)
unstashAll()
case GetKVFailed(ex) =>
LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
context.parent ! PoisonPill
case msg =>
LOG.info(s"Get message ${msg.getClass.getSimpleName}")
stash()
}
def receiveHandler: Receive = workerMsgHandler orElse
appMasterMsgHandler orElse
onMasterListChange orElse
clientMsgHandler orElse
metricsService orElse
jarStoreService orElse
terminationWatch orElse
disassociated orElse
kvServiceMsgHandler orElse
ActorUtil.defaultMsgHandler(self)
def workerMsgHandler: Receive = {
case RegisterNewWorker =>
val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
nextWorkerId += 1
kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
val workerHostname = ActorUtil.getHostname(sender())
LOG.info(s"Register new from $workerHostname ....")
self forward RegisterWorker(workerId)
case RegisterWorker(id) =>
context.watch(sender())
sender ! WorkerRegistered(id, MasterInfo(self, birth))
scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
workers += (sender() -> id)
val workerHostname = ActorUtil.getHostname(sender())
LOG.info(s"Register Worker with id $id from $workerHostname ....")
case resourceUpdate: ResourceUpdate =>
scheduler forward resourceUpdate
}
def jarStoreService: Receive = {
case GetJarStoreServer =>
jarStore forward GetJarStoreServer
}
def kvServiceMsgHandler: Receive = {
case PutKVSuccess =>
// Skip
case PutKVFailed(key, exception) =>
LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
ExceptionUtils.getStackTrace(exception))
}
def metricsService: Receive = {
case query: QueryHistoryMetrics =>
if (historyMetricsService.isEmpty) {
// Returns empty metrics so that we don't hang the UI
sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
} else {
historyMetricsService.get forward query
}
}
def appMasterMsgHandler: Receive = {
case request: RequestResource =>
scheduler forward request
case registerAppMaster: RegisterAppMaster =>
appManager forward registerAppMaster
case activateAppMaster: ActivateAppMaster =>
appManager forward activateAppMaster
case save: SaveAppData =>
appManager forward save
case get: GetAppData =>
appManager forward get
case GetAllWorkers =>
sender ! WorkerList(workers.values.toList)
case GetMasterData =>
val aliveFor = System.currentTimeMillis() - birth
val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
val userDir = System.getProperty("user.dir")
val masterDescription =
MasterSummary(
MasterNode(hostPort.host, hostPort.port),
masters,
aliveFor,
logFileDir,
jarStoreRootPath,
MasterStatus.Synced,
userDir,
List.empty[MasterActivity],
jvmName = ManagementFactory.getRuntimeMXBean().getName(),
historyMetricsConfig = getHistoryMetricsConfig
)
sender ! MasterData(masterDescription)
case invalidAppMaster: InvalidAppMaster =>
appManager forward invalidAppMaster
}
import scala.util.{Failure, Success}
def onMasterListChange: Receive = {
case MasterListUpdated(masters: List[MasterNode]) =>
this.masters = masters
}
def clientMsgHandler: Receive = {
case app: SubmitApplication =>
LOG.debug(s"Receive from client, SubmitApplication $app")
appManager.forward(app)
case app: RestartApplication =>
LOG.debug(s"Receive from client, RestartApplication $app")
appManager.forward(app)
case app: ShutdownApplication =>
LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
scheduler ! ApplicationFinished(app.appId)
appManager.forward(app)
case app: ResolveAppId =>
LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
appManager.forward(app)
case resolve: ResolveWorkerId =>
LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
val worker = workers.find(_._2 == resolve.workerId)
worker match {
case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
case None => sender ! ResolveWorkerIdResult(Failure(
new Exception(s"cannot find worker ${resolve.workerId}")))
}
case AppMastersDataRequest =>
LOG.debug("Master received AppMastersDataRequest")
appManager forward AppMastersDataRequest
case appMasterDataRequest: AppMasterDataRequest =>
LOG.debug("Master received AppMasterDataRequest")
appManager forward appMasterDataRequest
case query: QueryAppMasterConfig =>
LOG.debug("Master received QueryAppMasterConfig")
appManager forward query
case QueryMasterConfig =>
sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
}
def disassociated: Receive = {
case disassociated: DisassociatedEvent =>
LOG.info(s" disassociated ${disassociated.remoteAddress}")
}
def terminationWatch: Receive = {
case t: Terminated =>
val actor = t.actor
LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
t.getAddressTerminated())
LOG.info("Let's filter out dead resources...")
// Filters out dead worker resource
if (workers.keySet.contains(actor)) {
scheduler ! WorkerTerminated(workers.get(actor).get)
workers -= actor
}
}
override def preStart(): Unit = {
val path = ActorUtil.getFullPath(context.system, self.path)
LOG.info(s"master path is $path")
val schedulerClass = Class.forName(
systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
classOf[AppManager].getSimpleName)
scheduler = context.actorOf(Props(schedulerClass))
context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
}
}
object Master {
final val MASTER_GROUP = "master_group"
final val WORKER_ID = "next_worker_id"
case class WorkerTerminated(workerId: WorkerId)
case class MasterInfo(master: ActorRef, startTime: Long = 0L)
/** Notify the subscriber that master actor list has been updated */
case class MasterListUpdated(masters: List[MasterNode])
object MasterInfo {
def empty: MasterInfo = MasterInfo(null)
}
case class SlotStatus(totalSlots: Int, availableSlots: Int)
}