blob: 13414644e3c1df649948494112c043a9b1851f27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.appmaster
import java.lang.management.ManagementFactory
import akka.actor._
import org.apache.gearpump._
import org.apache.gearpump.cluster.AppMasterToMaster.ActivateAppMaster
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure}
import org.apache.gearpump.cluster._
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.metrics.Metrics.ReportMetrics
import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
import org.apache.gearpump.streaming.partitioner.PartitionerDescription
import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask}
import org.apache.gearpump.streaming._
import org.apache.gearpump.streaming.appmaster.AppMaster._
import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor}
import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo}
import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, GetTaskList, TaskList, FailedToRecover}
import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster
import org.apache.gearpump.streaming.task._
import org.apache.gearpump.streaming.util.ActorPathUtil
import org.apache.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _}
import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
import org.apache.gearpump.util._
import org.slf4j.Logger
import scala.concurrent.Future
/**
* AppMaster is the head of a streaming application.
*
* It contains:
* 1. ExecutorManager to manage all executors.
* 2. TaskManager to manage all tasks,
* 3. ClockService to track the global clock for this streaming application.
* 4. Scheduler to decide which a task should be scheduled to.
*/
class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster {
import app.userConfig
import appContext.{appId, masterProxy, username}
private implicit val actorSystem = context.system
private implicit val timeOut = FUTURE_TIMEOUT
import akka.pattern.ask
private implicit val dispatcher = context.dispatcher
private val startTime: TimeStamp = System.currentTimeMillis()
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
LOG.info(s"AppMaster actor path: ${ActorUtil.getFullPath(context.system, self.path)}")
private val address = ActorUtil.getFullPath(context.system, self.path)
private val store = new InMemoryAppStoreOnMaster(appId, appContext.masterProxy)
private val dagManager = context.actorOf(Props(new DagManager(appContext.appId, userConfig, store,
Some(getUpdatedDAG))))
private var taskManager: Option[ActorRef] = None
private var clockService: Option[ActorRef] = None
private val systemConfig = context.system.settings.config
private var lastFailure = LastFailure(0L, null)
private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
self.path.toString,
Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
private val userDir = System.getProperty("user.dir")
private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config)
private val appMasterExecutorSummary = ExecutorSummary(
APPMASTER_DEFAULT_EXECUTOR_ID,
Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified),
self.path.toString,
logFile.getAbsolutePath,
status = "Active",
taskCount = 0,
tasks = Map.empty[ProcessorId, List[TaskId]],
jvmName = ManagementFactory.getRuntimeMXBean.getName
)
private val historyMetricsService = if (metricsEnabled) {
// Registers jvm metrics
Metrics(context.system).register(new JvmMetricsSet(
s"app$appId.executor$APPMASTER_DEFAULT_EXECUTOR_ID"))
val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(
s"app$appId", getHistoryMetricsConfig)))
val metricsReportService = context.actorOf(Props(
new MetricsReporterService(Metrics(context.system))))
historyMetricsService.tell(ReportMetrics, metricsReportService)
Some(historyMetricsService)
} else {
None
}
private val executorManager: ActorRef =
context.actorOf(ExecutorManager.props(userConfig, appContext, app.clusterConfig, app.name),
ActorPathUtil.executorManagerActorName)
for (dag <- getDAG) {
clockService = Some(context.actorOf(Props(new ClockService(dag, store))))
val jarScheduler = new JarScheduler(appId, app.name, systemConfig, context)
taskManager = Some(context.actorOf(Props(new TaskManager(appContext.appId, dagManager,
jarScheduler, executorManager, clockService.get, self, app.name))))
}
override def receive: Receive = {
taskMessageHandler orElse
executorMessageHandler orElse
ready orElse
recover orElse
appMasterService orElse
ActorUtil.defaultMsgHandler(self)
}
/** Handles messages from Tasks */
def taskMessageHandler: Receive = {
case clock: ClockEvent =>
clockService.foreach(_ forward clock)
case register: RegisterTask =>
taskManager.foreach(_ forward register)
case unRegister: UnRegisterTask =>
taskManager.foreach(_ forward unRegister)
// Checks whether this processor dead, if it is, then we should remove it from clockService.
clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId))
case replay: ReplayFromTimestampWindowTrailingEdge =>
if (replay.appId == appId) {
taskManager.foreach(_ forward replay)
} else {
LOG.error(s"replay for invalid appId ${replay.appId}")
}
case messageLoss: MessageLoss =>
lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause)
taskManager.foreach(_ forward messageLoss)
case lookupTask: LookupTaskActorRef =>
taskManager.foreach(_ forward lookupTask)
case GetDAG =>
val task = sender()
getDAG.foreach {
dag => task ! dag
}
}
/** Handles messages from Executors */
def executorMessageHandler: Receive = {
case register: RegisterExecutor =>
executorManager forward register
case ReportMetrics =>
historyMetricsService.foreach(_ forward ReportMetrics)
}
/** Handles messages from AppMaster */
def appMasterService: Receive = {
case AppMasterDataDetailRequest(rid) =>
if (rid == appId) {
LOG.info(s"AppMaster got AppMasterDataDetailRequest for $appId ")
val executorsFuture = executorBrief
val clockFuture = getMinClock
val taskFuture = getTaskList
val dagFuture = getDAG
val appMasterDataDetail = for {
executors <- executorsFuture
clock <- clockFuture
tasks <- taskFuture
dag <- dagFuture
} yield {
val graph = dag.graph
val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
_.keys.toList
}
val processors = dag.processors.map { kv =>
val processor = kv._2
import processor._
val tasks = executorToTasks.map { kv =>
(kv._1, TaskCount(kv._2.count(_.processorId == id)))
}.filter(_._2.count != 0)
(id, ProcessorSummary(id, taskClass, parallelism, description, taskConf, life,
tasks.keys.toList, tasks))
}
StreamAppMasterSummary(
appId = appId,
appName = app.name,
actorPath = address,
clock = clock,
status = MasterToAppMaster.AppMasterActive,
startTime = startTime,
uptime = System.currentTimeMillis() - startTime,
user = username,
homeDirectory = userDir,
logFile = logFile.getAbsolutePath,
processors = processors,
processorLevels = graph.vertexHierarchyLevelMap(),
dag = graph.mapEdge { (node1, edge, node2) =>
edge.partitionerFactory.name
},
executors = executors,
historyMetricsConfig = getHistoryMetricsConfig
)
}
val client = sender()
appMasterDataDetail.map { appData =>
client ! appData
}
} else {
LOG.error(s"AppMasterDataDetailRequest for invalid appId $rid")
}
// TODO: WebSocket is buggy and disabled.
// case appMasterMetricsRequest: AppMasterMetricsRequest =>
// val client = sender()
// actorSystem.eventStream.subscribe(client, classOf[MetricType])
case query: QueryHistoryMetrics =>
if (historyMetricsService.isEmpty) {
// Returns empty metrics so that we don't hang the UI
sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
} else {
historyMetricsService.get forward query
}
case getStalling: GetStallingTasks =>
clockService.foreach(_ forward getStalling)
case replaceDAG: ReplaceProcessor =>
dagManager forward replaceDAG
case GetLastFailure(id) =>
if (id == appId) {
sender ! lastFailure
} else {
LOG.error(s"GetLastFailure for invalid appId $id")
}
case get@GetExecutorSummary(executorId) =>
val client = sender()
if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {
client ! appMasterExecutorSummary
} else {
ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
.map { map =>
map.get(executorId).foreach { executor =>
executor.executor.tell(get, client)
}
}
}
case query@QueryExecutorConfig(executorId) =>
val client = sender()
if (executorId == -1) {
val systemConfig = context.system.settings.config
sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
} else {
ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
.map { map =>
map.get(executorId).foreach { executor =>
executor.executor.tell(query, client)
}
}
}
}
def ready: Receive = {
case ApplicationReady =>
masterProxy ! ActivateAppMaster(appId)
case AppMasterActivated(id) =>
LOG.info(s"AppMaster for app$id is activated")
}
/** Error handling */
def recover: Receive = {
case FailedToRecover(errorMsg) =>
if (context.children.toList.contains(sender())) {
LOG.error(errorMsg)
masterProxy ! ShutdownApplication(appId)
}
case AllocateResourceTimeOut =>
LOG.error(s"Failed to allocate resource in time, shutdown application $appId")
masterProxy ! ShutdownApplication(appId)
context.stop(self)
}
private def getMinClock: Future[TimeStamp] = {
clockService match {
case Some(service) =>
(service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)
case None =>
Future.failed(new ServiceNotAvailableException("clock service not ready"))
}
}
private def executorBrief: Future[List[ExecutorBrief]] = {
ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
.map { infos =>
infos.values.map { info =>
ExecutorBrief(info.executorId,
info.executor.path.toSerializationFormat,
info.worker.workerId,
"active")
}.toList :+ appMasterBrief
}
}
private def getTaskList: Future[TaskList] = {
taskManager match {
case Some(manager) =>
(manager ? GetTaskList).asInstanceOf[Future[TaskList]]
case None =>
Future.failed(new ServiceNotAvailableException("task manager not ready"))
}
}
private def getDAG: Future[DAG] = {
(dagManager ? GetLatestDAG).asInstanceOf[Future[LatestDAG]].map(_.dag)
}
private def getUpdatedDAG: DAG = {
val dag = DAG(userConfig.getValue[Graph[ProcessorDescription,
PartitionerDescription]](StreamApplication.DAG).get)
val updated = dag.processors.map { idAndProcessor =>
val (id, oldProcessor) = idAndProcessor
val newProcessor = if (oldProcessor.jar == null) {
oldProcessor.copy(jar = appContext.appJar.orNull)
} else {
oldProcessor
}
(id, newProcessor)
}
DAG(dag.version, updated, dag.graph)
}
}
object AppMaster {
/** Master node doesn't return resource in time */
case object AllocateResourceTimeOut
/** Query task ActorRef by providing the taskId */
case class LookupTaskActorRef(taskId: TaskId)
case class TaskActorRef(task: ActorRef)
class ServiceNotAvailableException(reason: String) extends Exception(reason)
case class ExecutorBrief(
executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
}