blob: 8a3d7d1bd2be89bda871b4f4c6b8305c5e76e32d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.cluster.scheduler
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
import org.apache.gearpump.cluster.master.Master.MasterInfo
import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
import org.apache.gearpump.cluster.worker.WorkerId
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.concurrent.duration._
class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll{
def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
val appId = 0
val workerId1: WorkerId = WorkerId(1, 0L)
val workerId2: WorkerId = WorkerId(2, 0L)
val mockAppMaster = TestProbe()
val mockWorker1 = TestProbe()
val mockWorker2 = TestProbe()
override def afterAll {
TestKit.shutdownActorSystem(system)
}
"The scheduler" should {
"update resource only when the worker is registered" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
s"registered into master"))
}
"drop application's resource requests when the application is removed" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
mockAppMaster.expectNoMsg(5.seconds)
}
}
def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
}
"The resource request with higher priority" should {
"be handled first" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
var expect = ResourceAllocated(
Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
expect = ResourceAllocated(
Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
expect = ResourceAllocated(
Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
expect = ResourceAllocated(
Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
}
}
"The resource request which delivered earlier" should {
"be handled first if the priorities are the same" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
var expect = ResourceAllocated(
Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
expect = ResourceAllocated(
Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
}
}
"The PriorityScheduler" should {
"handle the resource request with different relaxation" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
var expect = ResourceAllocated(
Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
expect = ResourceAllocated(
Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
val request3 = ResourceRequest(
Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
expect = ResourceAllocated(Array(
ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
// We have to manually update the resource on each worker
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
expect = ResourceAllocated(
Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
mockAppMaster.expectMsgPF(5.seconds) {
case request: ResourceAllocated if sameElement(request, expect) => Unit
}
}
}
"The PriorityScheduler" should {
"handle the resource request with different executor number" in {
val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
// By default, the request requires only one executor
val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
assert(allocations2.allocations.length == 1)
assert(allocations2.allocations.head.resource == Resource(20))
val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
assert(allocations3.allocations.length == 3)
assert(allocations3.allocations.forall(_.resource == Resource(8)))
// The total available resource can not satisfy the requirements with executor number
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
assert(allocations4.allocations.length == 2)
assert(allocations4.allocations.forall(_.resource == Resource(20)))
// When new resources are available, the remaining request will be satisfied
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
assert(allocations5.allocations.length == 1)
assert(allocations4.allocations.forall(_.resource == Resource(20)))
}
}
}