blob: b910d57c562a86c847005de2a28c808dc9770066 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.appmaster
import org.apache.gearpump.cluster.scheduler.Resource
import org.apache.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary
import org.apache.gearpump.streaming.appmaster.TaskRegistry._
import org.apache.gearpump.streaming.task.TaskId
import org.apache.gearpump.streaming.{ExecutorId, ProcessorId}
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util.LogUtil
import org.slf4j.Logger
/**
* Tracks the registration of all tasks, when the application is booting up.
*/
class TaskRegistry(val expectedTasks: List[TaskId],
var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation],
var deadTasks: Set[TaskId] = Set.empty[TaskId]) {
private val LOG: Logger = LogUtil.getLogger(getClass)
private val processors = expectedTasks.map(_.processorId).toSet
/**
* When a task is booted, it need to call registerTask to register itself.
* If this task is valid, then accept it, otherwise reject it.
*
* @param taskId Task that register itself to TaskRegistry.
* @param location The host and port where this task is running on. NOTE: The host and port
* is NOT the same host and port of Akka remoting. Instead, it is host and port
* of custom netty layer, see [[org.apache.gearpump.transport.netty.Context]].
*/
def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = {
val processorId = taskId.processorId
if (processors.contains(processorId)) {
registeredTasks += taskId -> location
Accept
} else {
LOG.error(s" the task is not accepted for registration, taskId: ${taskId}")
Reject
}
}
def copy(expectedTasks: List[TaskId] = this.expectedTasks,
registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
new TaskRegistry(expectedTasks, registeredTasks, deadTasks)
}
def getTaskLocations: TaskLocations = {
val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair =>
val (k, v) = pair
val taskIds = v.map(_._1)
(k, taskIds.toSet)
}
TaskLocations(taskLocations)
}
def getTaskExecutorMap: Map[TaskId, ExecutorId] = {
getTaskLocations.locations.flatMap { pair =>
val (hostPort, taskSet) = pair
taskSet.map { taskId =>
(taskId, getExecutorId(taskId).getOrElse(-1))
}
}
}
/** Query the executor Id where the task is running on */
def getExecutorId(taskId: TaskId): Option[Int] = {
registeredTasks.get(taskId).map(_.executorId)
}
/** Gets list of allocated executor Ids */
def executors: List[ExecutorId] = {
registeredTasks.toList.map(_._2.executorId)
}
def isAllTasksRegistered: Boolean = {
val aliveTasks = (expectedTasks.toSet -- deadTasks)
aliveTasks.forall(task => registeredTasks.contains(task))
}
def isTaskRegisteredForExecutor(executorId: ExecutorId): Boolean = {
registeredTasks.exists(_._2.executorId == executorId)
}
private def filterTasks(processorId: ProcessorId): List[TaskId] = {
registeredTasks.keys.toList.filter(_.processorId == processorId)
}
/** List of executors that current processor taks are running on */
def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = {
val taskToExecutor = filterTasks(processorId).flatMap { taskId =>
getExecutorId(taskId).map { executorId =>
(taskId, executorId)
}
}
val executorToTasks = taskToExecutor.groupBy(_._2).map { kv =>
val (k, v) = kv
(k, v.map(_._1))
}
executorToTasks
}
/** Summary about how many resources are used for all running tasks */
def usedResource: ExecutorResourceUsageSummary = {
val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) =>
val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1)
map + (task._2.executorId -> resource)
}
ExecutorResourceUsageSummary(resourceMap)
}
}
object TaskRegistry {
sealed trait RegisterTaskStatus
case object Accept extends RegisterTaskStatus
case object Reject extends RegisterTaskStatus
case class TaskLocation(executorId: Int, host: HostPort)
case class TaskLocations(locations: Map[HostPort, Set[TaskId]])
}