blob: 623e3ff282aea9f25966cc0cb461875889de0333 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.scheduler
import akka.actor.ActorRef
import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import org.apache.gearpump.cluster.scheduler.Relaxation._
import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
import org.apache.gearpump.cluster.worker.WorkerId
import scala.collection.mutable
/** Assign resource to application based on the priority of the application */
class PriorityScheduler extends Scheduler {
private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
override def compare(x: PendingRequest, y: PendingRequest): Int = {
var res = x.request.priority.id - y.request.priority.id
if (res == 0) {
res = y.timeStamp.compareTo(x.timeStamp)
}
res
}
}
override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
override def allocateResource(): Unit = {
var scheduleLater = Array.empty[PendingRequest]
val resourcesSnapShot = resources.clone()
var allocated = Resource.empty
val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
while (resourceRequests.nonEmpty && (allocated < totalResource)) {
val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
request.relaxation match {
case ANY =>
val allocations = allocateFairly(resourcesSnapShot, request)
val newAllocated = Resource(allocations.map(_.resource.slots).sum)
if (allocations.nonEmpty) {
appMaster ! ResourceAllocated(allocations.toArray)
}
if (newAllocated < request.resource) {
val remainingRequest = request.resource - newAllocated
val remainingExecutors = request.executorNum - allocations.length
val newResourceRequest = request.copy(resource = remainingRequest,
executorNum = remainingExecutors)
scheduleLater = scheduleLater :+
PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
}
allocated = allocated + newAllocated
case ONEWORKER =>
val availableResource = resourcesSnapShot.find { params =>
val (_, (_, resource)) = params
resource > request.resource
}
if (availableResource.nonEmpty) {
val (workerId, (worker, resource)) = availableResource.get
allocated = allocated + request.resource
appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
workerId)))
resourcesSnapShot.update(workerId, (worker, resource - request.resource))
} else {
scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
}
case SPECIFICWORKER =>
val workerAndResource = resourcesSnapShot.get(request.workerId)
if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
val (worker, availableResource) = workerAndResource.get
appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
request.workerId)))
allocated = allocated + request.resource
resourcesSnapShot.update(request.workerId, (worker,
availableResource - request.resource))
} else {
scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
}
}
}
for (request <- scheduleLater)
resourceRequests.enqueue(request)
}
def resourceRequestHandler: Receive = {
case RequestResource(appId, request) =>
LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
val appMaster = sender()
resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
System.currentTimeMillis()))
allocateResource()
}
override def doneApplication(appId: Int): Unit = {
resourceRequests = resourceRequests.filter(_.appId != appId)
}
private def allocateFairly(
resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
: List[ResourceAllocation] = {
val workerNum = resources.size
var allocations = List.empty[ResourceAllocation]
var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
var remainingRequest = request.resource
var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
val exeutorNum = Math.min(workerNum, remainingExecutors)
val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
val pickedResources = sortedResources.take(exeutorNum)
val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
val ((workerId, (worker, resource)), index) = workerWithIndex
0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
}.sortBy(_._2).map(_._1)
if (flattenResource.length < toRequest.slots) {
// Can not safisfy the user's requirements
totalAvailable = Resource.empty
} else {
flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
toArray.foreach { params =>
val ((workerId, worker), slots) = params
resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
}
totalAvailable -= toRequest
remainingRequest -= toRequest
remainingExecutors -= exeutorNum
}
}
allocations
}
}