| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.gearpump.streaming.appmaster |
| |
| import akka.actor._ |
| import akka.pattern.ask |
| import com.typesafe.config.Config |
| import org.apache.gearpump.TimeStamp |
| import org.apache.gearpump.cluster.AppJar |
| import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest} |
| import org.apache.gearpump.cluster.worker.WorkerId |
| import org.apache.gearpump.streaming.partitioner.PartitionerDescription |
| import org.apache.gearpump.streaming.appmaster.JarScheduler._ |
| import org.apache.gearpump.streaming.task.TaskId |
| import org.apache.gearpump.streaming.{DAG, ProcessorDescription} |
| import org.apache.gearpump.util.{Constants, Graph, LogUtil} |
| |
| import scala.concurrent.Future |
| |
| /** |
| * Different processors of the stream application can use different jars. JarScheduler is the |
| * scheduler for different jars. |
| * |
| * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar |
| * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler. |
| * |
| * In runtime, the implementation is delegated to actor JarSchedulerImpl |
| */ |
| class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) { |
| private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config))) |
| private implicit val dispatcher = factory.dispatcher |
| private implicit val timeout = Constants.FUTURE_TIMEOUT |
| |
| /** Set the current DAG version active */ |
| def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = { |
| actor ! TransitToNewDag |
| startClock.map { start => |
| actor ! NewDag(dag, start) |
| } |
| } |
| |
| /** AppMaster ask JarScheduler about how many resource it wants */ |
| def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = { |
| (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]] |
| } |
| |
| /** |
| * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks |
| * for this executor. |
| */ |
| def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) |
| : Future[List[TaskId]] = { |
| (actor ? ScheduleTask(appJar, workerId, executorId, resource)) |
| .asInstanceOf[Future[List[TaskId]]] |
| } |
| |
| /** |
| * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted |
| * tasks. |
| */ |
| def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = { |
| (actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]] |
| } |
| } |
| |
| object JarScheduler { |
| |
| case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest]) |
| |
| case class NewDag(dag: DAG, startTime: TimeStamp) |
| |
| case object TransitToNewDag |
| |
| case object GetResourceRequestDetails |
| |
| /** |
| * Schedule tasks for one appJar. |
| * |
| * @param appJar Application jar. |
| * @param workerId Worker machine Id. |
| * @param executorId Executor Id. |
| * @param resource Slots that are available. |
| */ |
| case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) |
| |
| /** Some executor JVM is dead, try to recover tasks that are located on failed executor */ |
| case class ExecutorFailed(executorId: Int) |
| |
| class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash { |
| |
| // Each TaskScheduler maps to a jar. |
| private var taskSchedulers = Map.empty[AppJar, TaskScheduler] |
| |
| private val LOG = LogUtil.getLogger(getClass) |
| |
| def receive: Receive = waitForNewDag |
| |
| def waitForNewDag: Receive = { |
| case TransitToNewDag => // Continue current state |
| case NewDag(dag, startTime) => |
| |
| LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime") |
| |
| val processors = dag.processors.values.groupBy(_.jar) |
| |
| taskSchedulers = processors.map { jarAndProcessors => |
| val (jar, processors) = jarAndProcessors |
| // Construct the sub DAG, each sub DAG maps to a separate jar. |
| val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription] |
| processors.foreach { processor => |
| if (startTime < processor.life.death) { |
| subGraph.addVertex(processor) |
| } |
| } |
| val subDagForSingleJar = DAG(subGraph) |
| |
| val taskScheduler = taskSchedulers |
| .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config)) |
| |
| LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size) |
| taskScheduler.setDAG(subDagForSingleJar) |
| jar -> taskScheduler |
| } |
| unstashAll() |
| context.become(ready) |
| case other => |
| stash() |
| } |
| |
| def ready: Receive = { |
| // Notifies there is a new DAG coming. |
| case TransitToNewDag => |
| context.become(waitForNewDag) |
| |
| case GetResourceRequestDetails => |
| |
| // Asks each TaskScheduler (Each for one jar) the resource requests. |
| val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler => |
| val (jar, scheduler) = jarAndScheduler |
| ResourceRequestDetail(jar, scheduler.getResourceRequests()) |
| }.toArray |
| LOG.info(s"GetRequestDetails " + result.mkString(";")) |
| sender ! result |
| |
| case ScheduleTask(appJar, workerId, executorId, resource) => |
| val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler => |
| scheduler.schedule(workerId, executorId, resource) |
| }.getOrElse(List.empty) |
| LOG.info(s"ScheduleTask " + result.mkString(";")) |
| sender ! result |
| case ExecutorFailed(executorId) => |
| val result: Option[ResourceRequestDetail] = taskSchedulers. |
| find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler => |
| ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId)) |
| } |
| LOG.info(s"ExecutorFailed " + result.mkString(";")) |
| sender ! result |
| } |
| } |
| } |