blob: 56bf61d9accea4fcb267fdbabadb93b613f028b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.executor
import java.lang.management.ManagementFactory
import scala.concurrent.duration._
import akka.actor.SupervisorStrategy.Resume
import akka.actor._
import com.typesafe.config.Config
import org.apache.commons.lang.exception.ExceptionUtils
import org.slf4j.Logger
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig}
import org.apache.gearpump.metrics.Metrics.ReportMetrics
import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
import org.apache.gearpump.serializer.SerializationFramework
import org.apache.gearpump.streaming.AppMasterToExecutor.{MsgLostException, TasksChanged, TasksLaunched, _}
import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask}
import org.apache.gearpump.streaming.ProcessorId
import org.apache.gearpump.streaming.executor.Executor._
import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument
import org.apache.gearpump.streaming.task.{Subscriber, TaskId}
import org.apache.gearpump.transport.{Express, HostPort}
import org.apache.gearpump.util.Constants._
import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, TimeOutScheduler}
/**
* Executor is child of AppMaster.
* It usually represents a JVM process. It is a container for all tasks.
*/
// TODO: What if Executor stuck in state DynamicDag and cannot get out???
// For example, due to some message loss when there is network glitch.
// Executor will hang there for ever???
//
class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher: ITaskLauncher)
extends Actor with TimeOutScheduler{
def this(executorContext: ExecutorContext, userConf: UserConfig) = {
this(executorContext, userConf, TaskLauncher(executorContext, userConf))
}
import context.dispatcher
import executorContext.{appId, appMaster, executorId, resource, worker}
private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, app = appId)
private implicit val timeOut = FUTURE_TIMEOUT
private val address = ActorUtil.getFullPath(context.system, self.path)
private val systemConfig = context.system.settings.config
private val serializerPool = getSerializerPool()
private val taskDispatcher = systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER)
private var state = State.ACTIVE
// States transition start, in unix time
private var transitionStart = 0L
// States transition end, in unix time
private var transitionEnd = 0L
private val transitWarningThreshold = 5000 // ms,
// Starts health check Ticks
self ! HealthCheck
LOG.info(s"Executor $executorId has been started, start to register itself...")
LOG.info(s"Executor actor path: ${ActorUtil.getFullPath(context.system, self.path)}")
appMaster ! RegisterExecutor(self, executorId, resource, worker)
context.watch(appMaster)
private var tasks = Map.empty[TaskId, ActorRef]
private val taskArgumentStore = new TaskArgumentStore()
val express = Express(context.system)
val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
if (metricsEnabled) {
// Registers jvm metrics
Metrics(context.system).register(new JvmMetricsSet(s"app$appId.executor$executorId"))
val metricsReportService = context.actorOf(Props(new MetricsReporterService(
Metrics(context.system))))
appMaster.tell(ReportMetrics, metricsReportService)
}
private val NOT_INITIALIZED = -1
def receive: Receive = applicationReady(dagVersion = NOT_INITIALIZED)
private def getTaskId(actorRef: ActorRef): Option[TaskId] = {
tasks.find(_._2 == actorRef).map(_._1)
}
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
case _: MsgLostException =>
val taskId = getTaskId(sender)
val cause = s"We got MessageLossException from task ${getTaskId(sender)}, " +
s"replaying application..."
LOG.error(cause)
taskId.foreach(appMaster ! MessageLoss(executorId, _, cause))
Resume
case ex: Throwable =>
val taskId = getTaskId(sender)
val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will treat it as" +
s" MessageLoss, so that the system will replay all lost message"
LOG.error(errorMsg, ex)
val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex)
taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg))
Resume
}
private def launchTask(taskId: TaskId, argument: TaskArgument): ActorRef = {
launcher.launch(List(taskId), argument, context, serializerPool, taskDispatcher).values.head
}
private def assertVersion(expectVersion: Int, version: Int, clue: Any): Unit = {
if (expectVersion != version) {
val errorMessage = s"Version mismatch: we expect dag version $expectVersion, " +
s"but get $version; clue: $clue"
LOG.error(errorMessage)
throw new DagVersionMismatchException(errorMessage)
}
}
def dynamicDagPhase1(
dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], registered: List[TaskId])
: Receive = {
state = State.DYNAMIC_DAG_PHASE1
box({
case launch@LaunchTasks(taskIds, version, processorDescription,
subscribers: List[Subscriber]) => {
assertVersion(dagVersion, version, clue = launch)
LOG.info(s"Launching Task $taskIds for app: $appId")
val taskArgument = TaskArgument(version, processorDescription, subscribers)
taskIds.foreach(taskArgumentStore.add(_, taskArgument))
val newAdded = launcher.launch(taskIds, taskArgument, context, serializerPool,
taskDispatcher)
newAdded.foreach { newAddedTask =>
context.watch(newAddedTask._2)
}
tasks ++= newAdded
sender ! TasksLaunched
context.become(dynamicDagPhase1(version, launched ++ taskIds, changed, registered))
}
case change@ChangeTasks(taskIds, version, life, subscribers) =>
assertVersion(dagVersion, version, clue = change)
LOG.info(s"Change Tasks $taskIds for app: $appId, verion: $life, $dagVersion, $subscribers")
val newChangedTasks = taskIds.map { taskId =>
for (taskArgument <- taskArgumentStore.get(dagVersion, taskId)) {
val processorDescription = taskArgument.processorDescription.copy(life = life)
taskArgumentStore.add(taskId, TaskArgument(dagVersion, processorDescription,
subscribers))
}
ChangeTask(taskId, dagVersion, life, subscribers)
}
sender ! TasksChanged(taskIds)
context.become(dynamicDagPhase1(dagVersion, launched, changed ++ newChangedTasks,
registered))
case locations@TaskLocationsReady(taskLocations, version) =>
LOG.info(s"TaskLocations Ready...")
assertVersion(dagVersion, version, clue = locations)
// Check whether all tasks has been registered.
if ((launched.toSet -- registered.toSet).isEmpty) {
// Confirm all tasks has been registered.
val result = taskLocations.locations.filter {
location => !location._1.equals(express.localHost)
}.flatMap { kv =>
val (host, taskIdList) = kv
taskIdList.map(taskId => (TaskId.toLong(taskId), host))
}
val replyTo = sender
express.startClients(taskLocations.locations.keySet).foreach { _ =>
express.remoteAddressMap.send(result)
express.remoteAddressMap.future().foreach { _ =>
LOG.info(s"sending TaskLocationsReceived back to appmaster")
replyTo ! TaskLocationsReceived(version, executorId)
}
}
context.become(dynamicDagPhase2(dagVersion, launched, changed))
} else {
LOG.error("Inconsistency between AppMaser and Executor! AppMaster thinks DynamicDag " +
"transition is ready, while Executor have not get all tasks registered, " +
"that task will not be functional...")
// Reject TaskLocations...
val missedTasks = (launched.toSet -- registered.toSet).toList
val errorMsg = "We have not received TaskRegistered for following tasks: " +
missedTasks.mkString(", ")
LOG.error(errorMsg)
sender ! TaskLocationsRejected(dagVersion, executorId, errorMsg, null)
// Stays with current status...
}
case confirm: TaskRegistered =>
tasks.get(confirm.taskId).foreach {
case actorRef: ActorRef =>
tasks += confirm.taskId -> actorRef
actorRef forward confirm
}
context.become(dynamicDagPhase1(dagVersion, launched, changed,
registered :+ confirm.taskId))
case rejected: TaskRejected =>
// Means this task should not exist...
tasks.get(rejected.taskId).foreach(_ ! PoisonPill)
tasks -= rejected.taskId
LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting down it...")
case register: RegisterTask =>
appMaster ! register
})
}
def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask])
: Receive = {
LOG.info("Transit to dynamic Dag Phase2")
state = State.DYNAMIC_DAG_PHASE2
box {
case startAll@StartAllTasks(version) =>
LOG.info(s"Start All Tasks...")
assertVersion(dagVersion, version, clue = startAll)
launched.foreach(taskId => tasks.get(taskId).foreach(_ ! StartTask(taskId)))
changed.foreach(changeTask => tasks.get(changeTask.taskId).foreach(_ ! changeTask))
taskArgumentStore.removeNewerVersion(dagVersion)
taskArgumentStore.removeObsoleteVersion
context.become(applicationReady(dagVersion))
}
}
def applicationReady(dagVersion: Int): Receive = {
state = State.ACTIVE
transitionEnd = System.currentTimeMillis()
if (dagVersion != NOT_INITIALIZED) {
LOG.info("Transit to state Application Ready. This transition takes " +
(transitionEnd - transitionStart) + " milliseconds")
}
box {
case start: StartDynamicDag =>
LOG.info("received StartDynamicDag")
if (start.dagVersion > dagVersion) {
transitionStart = System.currentTimeMillis()
LOG.info(s"received $start, Executor transit to dag version: ${start.dagVersion} from " +
s"current version $dagVersion")
context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId],
List.empty[ChangeTask], List.empty[TaskId]))
}
case launch: LaunchTasks =>
if (launch.dagVersion > dagVersion) {
transitionStart = System.currentTimeMillis()
LOG.info(s"received $launch, Executor transit to dag " +
s"version: ${launch.dagVersion} from current version $dagVersion")
context.become(dynamicDagPhase1(launch.dagVersion, List.empty[TaskId],
List.empty[ChangeTask], List.empty[TaskId]))
self forward launch
}
case change: ChangeTasks =>
if (change.dagVersion > dagVersion) {
transitionStart = System.currentTimeMillis()
LOG.info(s"received $change, Executor transit to dag version: ${change.dagVersion} from" +
s" current version $dagVersion")
context.become(dynamicDagPhase1(change.dagVersion, List.empty[TaskId],
List.empty[ChangeTask], List.empty[TaskId]))
self forward change
}
case StopTask(taskId) =>
// Old soldiers never die, they just fade away ;)
val fadeAwayTask = tasks.get(taskId)
if (fadeAwayTask.isDefined) {
context.stop(fadeAwayTask.get)
}
tasks -= taskId
case unRegister@UnRegisterTask(taskId, _) =>
// Sends UnRegisterTask to AppMaster
appMaster ! unRegister
}
}
def restartingTasks(dagVersion: Int, remain: Int, needRestart: List[TaskId]): Receive = {
state = State.RECOVERY
box {
case TaskStopped(actor) =>
for (taskId <- getTaskId(actor)) {
if (taskArgumentStore.get(dagVersion, taskId).nonEmpty) {
val newNeedRestart = needRestart :+ taskId
val newRemain = remain - 1
if (newRemain == 0) {
val newRestarted = newNeedRestart.map { taskId_ =>
val taskActor = launchTask(taskId_, taskArgumentStore.get(dagVersion, taskId_).get)
context.watch(taskActor)
taskId_ -> taskActor
}.toMap
tasks = newRestarted
context.become(dynamicDagPhase1(dagVersion, newNeedRestart, List.empty[ChangeTask],
List.empty[TaskId]))
} else {
context.become(restartingTasks(dagVersion, newRemain, newNeedRestart))
}
}
}
}
}
val terminationWatch: Receive = {
case Terminated(actor) =>
if (actor.compareTo(appMaster) == 0) {
LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, shutting down current " +
s"executor $appId, $executorId")
context.stop(self)
} else {
self ! TaskStopped(actor)
}
}
def onRestartTasks: Receive = {
case RestartTasks(dagVersion) =>
transitionStart = System.currentTimeMillis()
LOG.info(s"Executor received restart tasks")
val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, _).nonEmpty)
express.remoteAddressMap.send(Map.empty[Long, HostPort])
context.become(restartingTasks(dagVersion, remain = tasksToRestart,
needRestart = List.empty[TaskId]))
tasks.values.foreach {
case task: ActorRef => task ! PoisonPill
}
}
def executorService: Receive = terminationWatch orElse onRestartTasks orElse {
case taskChanged: TaskChanged =>
// Skip
case get: GetExecutorSummary =>
val logFile = LogUtil.applicationLogDir(systemConfig)
val processorTasks = tasks.keySet.groupBy(_.processorId).mapValues(_.toList).view.force
sender ! ExecutorSummary(
executorId,
worker.workerId,
address,
logFile.getAbsolutePath,
state,
tasks.size,
processorTasks,
jvmName = ManagementFactory.getRuntimeMXBean().getName())
case query: QueryExecutorConfig =>
sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
case HealthCheck =>
context.system.scheduler.scheduleOnce(3.second)(HealthCheck)
if (state != State.ACTIVE && (transitionEnd - transitionStart) > transitWarningThreshold) {
LOG.error(s"Executor status: " + state +
s", it takes too long(${transitionEnd - transitionStart}) to do transition")
}
}
private def getSerializerPool(): SerializationFramework = {
val system = context.system.asInstanceOf[ExtendedActorSystem]
val clazz = Class.forName(systemConfig.getString(Constants.GEARPUMP_SERIALIZER_POOL))
val pool = clazz.newInstance().asInstanceOf[SerializationFramework]
pool.init(system, userConf)
pool.asInstanceOf[SerializationFramework]
}
private def unHandled(state: String): Receive = {
case other =>
LOG.info(s"Received unknown message $other in state: $state")
}
private def box(receive: Receive): Receive = {
executorService orElse receive orElse unHandled(state)
}
}
object Executor {
case class RestartTasks(dagVersion: Int)
class TaskArgumentStore {
private var store = Map.empty[TaskId, List[TaskArgument]]
def add(taskId: TaskId, task: TaskArgument): Unit = {
val list = store.getOrElse(taskId, List.empty[TaskArgument])
store += taskId -> (task :: list)
}
def get(dagVersion: Int, taskId: TaskId): Option[TaskArgument] = {
store.get(taskId).flatMap { list =>
list.find { arg =>
arg.dagVersion <= dagVersion
}
}
}
/**
* When the new DAG is successfully deployed, then we should remove obsolete
* TaskArgument of old DAG.
*/
def removeObsoleteVersion(): Unit = {
store = store.map { kv =>
val (k, list) = kv
(k, list.take(1))
}
}
def removeNewerVersion(currentVersion: Int): Unit = {
store = store.map { kv =>
val (k, list) = kv
(k, list.filter(_.dagVersion <= currentVersion))
}
}
}
case class TaskStopped(task: ActorRef)
case class ExecutorSummary(
id: Int,
workerId: WorkerId,
actorPath: String,
logFile: String,
status: String,
taskCount: Int,
tasks: Map[ProcessorId, List[TaskId]],
jvmName: String
)
object ExecutorSummary {
def empty: ExecutorSummary = {
ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = "")
}
}
case class GetExecutorSummary(executorId: Int)
case class QueryExecutorConfig(executorId: Int)
case class ExecutorConfig(config: Config)
class DagVersionMismatchException(msg: String) extends Exception(msg)
object State {
val ACTIVE = "active"
val DYNAMIC_DAG_PHASE1 = "dynamic_dag_phase1"
val DYNAMIC_DAG_PHASE2 = "dynamic_dag_phase2"
val RECOVERY = "dag_recovery"
}
object HealthCheck
}