| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; |
| |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| |
| import static org.mockito.Matchers.argThat; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| |
| public class TestProportionalCapacityPreemptionPolicyForReservedContainers |
| extends ProportionalCapacityPreemptionPolicyMockFramework { |
| @Before |
| public void setup() { |
| super.setup(); |
| conf.setBoolean( |
| CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, |
| true); |
| policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); |
| } |
| |
| @Test |
| public void testPreemptionForSimpleReservedContainer() throws IOException { |
| /** |
| * The simplest test of reserved container, Queue structure is: |
| * |
| * <pre> |
| * root |
| * / \ |
| * a b |
| * </pre> |
| * Guaranteed resource of a/b are 50:50 |
| * Total cluster resource = 100 |
| * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each |
| * container is 1. |
| * - B has am container at n1, and reserves 1 container with size = 9 at n1, |
| * so B needs to preempt 9 containers from A at n1 instead of randomly |
| * preempt from n1 and n2. |
| */ |
| String labelsConfig = |
| "=100,true;"; |
| String nodesConfig = // n1 / n2 has no label |
| "n1= res=50;" + |
| "n2= res=50"; |
| String queuesConfig = |
| // guaranteed,max,used,pending,reserved |
| "root(=[100 100 100 9 9]);" + //root |
| "-a(=[50 100 90 0]);" + // a |
| "-b(=[50 100 10 9 9])"; // b |
| String appsConfig= |
| //queueName\t(priority,resource,host,expression,#repeat,reserved) |
| "a\t" // app1 in a |
| + "(1,1,n1,,45,false)" // 45 in n1 |
| + "(1,1,n2,,45,false);" + // 45 in n2 |
| "b\t" // app2 in b |
| + "(1,1,n1,,1,false)" // AM container in n1 |
| + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 |
| |
| buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); |
| policy.editSchedule(); |
| |
| // Total 5 preempted from app1 at n1, don't preempt container from other |
| // app/node |
| verify(mDisp, times(5)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(1)))); |
| verify(mDisp, times(5)).handle( |
| argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n1", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(2)))); |
| } |
| |
| @Test |
| public void testUseReservedAndFifoSelectorTogether() throws IOException { |
| /** |
| * Queue structure is: |
| * |
| * <pre> |
| * root |
| * / \ |
| * a b |
| * </pre> |
| * Guaranteed resource of a/b are 30:70 |
| * Total cluster resource = 100 |
| * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each |
| * container is 1. |
| * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, |
| * B also has 20 pending resources. |
| * so B needs to preempt: |
| * - 10 containers from n1 (for reserved) |
| * - 5 containers from n2 for pending resources |
| */ |
| String labelsConfig = |
| "=100,true;"; |
| String nodesConfig = // n1 / n2 has no label |
| "n1= res=50;" + |
| "n2= res=50"; |
| String queuesConfig = |
| // guaranteed,max,used,pending,reserved |
| "root(=[100 100 100 70 10]);" + //root |
| "-a(=[30 100 45 0]);" + // a |
| "-b(=[70 100 55 70 50])"; // b |
| String appsConfig= |
| //queueName\t(priority,resource,host,expression,#repeat,reserved) |
| "a\t" // app1 in a |
| + "(1,1,n2,,35,false)" // 35 in n2 |
| + "(1,1,n1,,10,false);" + // 10 in n1 |
| "b\t" // app2 in b |
| + "(1,1,n2,,5,false)" // 5 in n2 |
| + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 |
| |
| buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); |
| policy.editSchedule(); |
| |
| verify(mDisp, times(15)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(1)))); |
| verify(mDisp, times(10)).handle( |
| argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n1", 1)))); |
| verify(mDisp, times(5)).handle( |
| argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n2", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(2)))); |
| } |
| |
| @Test |
| public void testReservedSelectorSkipsAMContainer() throws IOException { |
| /** |
| * Queue structure is: |
| * |
| * <pre> |
| * root |
| * / \ |
| * a b |
| * </pre> |
| * Guaranteed resource of a/b are 30:70 |
| * Total cluster resource = 100 |
| * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each |
| * container is 1. |
| * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, |
| * B also has 20 pending resources. |
| * |
| * Ideally B needs to preempt: |
| * - 10 containers from n1 (for reserved) |
| * - 5 containers from n2 for pending resources |
| * |
| * However, since one AM container is located at n1 (from queueA), we cannot |
| * preempt 10 containers from n1 for reserved container. Instead, we will |
| * preempt 15 containers from n2, since containers from queueA launched in n2 |
| * are later than containers from queueA launched in n1 (FIFO order of containers) |
| */ |
| String labelsConfig = |
| "=100,true;"; |
| String nodesConfig = // n1 / n2 has no label |
| "n1= res=50;" + |
| "n2= res=50"; |
| String queuesConfig = |
| // guaranteed,max,used,pending,reserved |
| "root(=[100 100 100 70 10]);" + //root |
| "-a(=[30 100 45 0]);" + // a |
| "-b(=[70 100 55 70 50])"; // b |
| String appsConfig= |
| //queueName\t(priority,resource,host,expression,#repeat,reserved) |
| "a\t" // app1 in a |
| + "(1,1,n1,,10,false)" // 10 in n1 |
| + "(1,1,n2,,35,false);" +// 35 in n2 |
| "b\t" // app2 in b |
| + "(1,1,n2,,5,false)" // 5 in n2 |
| + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 |
| |
| buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); |
| policy.editSchedule(); |
| |
| verify(mDisp, times(15)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(1)))); |
| verify(mDisp, times(0)).handle( |
| argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n1", 1)))); |
| verify(mDisp, times(15)).handle( |
| argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n2", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(2)))); |
| } |
| |
| @Test |
| public void testPreemptionForReservedContainerRespectGuaranteedResource() |
| throws IOException { |
| /** |
| * Queue structure is: |
| * |
| * <pre> |
| * root |
| * / \ |
| * a b |
| * </pre> |
| * Guaranteed resource of a/b are 85:15 |
| * Total cluster resource = 100 |
| * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each |
| * container is 1. |
| * - B has am container at n1, and reserves 1 container with size = 9 at n1, |
| * |
| * If we preempt 9 containers from queue-A, queue-A will be below its |
| * guaranteed resource = 90 - 9 = 81 < 85. |
| * |
| * So no preemption will take place |
| */ |
| String labelsConfig = |
| "=100,true;"; |
| String nodesConfig = // n1 / n2 has no label |
| "n1= res=50;" + |
| "n2= res=50"; |
| String queuesConfig = |
| // guaranteed,max,used,pending,reserved |
| "root(=[100 100 100 9 9]);" + //root |
| "-a(=[85 100 90 0]);" + // a |
| "-b(=[15 100 10 9 9])"; // b |
| String appsConfig= |
| //queueName\t(priority,resource,host,expression,#repeat,reserved) |
| "a\t" // app1 in a |
| + "(1,1,n1,,45,false)" // 45 in n1 |
| + "(1,1,n2,,45,false);" + // 45 in n2 |
| "b\t" // app2 in b |
| + "(1,1,n1,,1,false)" // AM container in n1 |
| + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 |
| |
| buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); |
| policy.editSchedule(); |
| |
| verify(mDisp, times(0)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( |
| getAppAttemptId(2)))); |
| } |
| |
| @Test |
| public void testPreemptionForReservedContainerWhichHasAvailableResource() |
| throws IOException { |
| /** |
| * Queue structure is: |
| * |
| * <pre> |
| * root |
| * / \ |
| * a b |
| * </pre> |
| * |
| * Guaranteed resource of a/b are 50:50 |
| * Total cluster resource = 100 |
| * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each |
| * container is 1. |
| * - B has am container at n1, and reserves 1 container with size = 9 at n1, |
| * |
| * So we can get 4 containers preempted after preemption. |
| * (reserved 5 + preempted 4) = 9 |
| */ |
| String labelsConfig = |
| "=100,true;"; |
| String nodesConfig = // n1 / n2 has no label |
| "n1= res=50;" + |
| "n2= res=50"; |
| String queuesConfig = |
| // guaranteed,max,used,pending,reserved |
| "root(=[100 100 99 9 9]);" + //root |
| "-a(=[50 100 90 0]);" + // a |
| "-b(=[50 100 9 9 9])"; // b |
| String appsConfig= |
| //queueName\t(priority,resource,host,expression,#repeat,reserved) |
| "a\t" // app1 in a |
| + "(1,1,n1,,45,false)" // 45 in n1 |
| + "(1,1,n2,,45,false);" + // 45 in n2 |
| "b\t" // app2 in b |
| + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 |
| |
| buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); |
| policy.editSchedule(); |
| |
| // Total 4 preempted from app1 at n1, don't preempt container from other |
| // app/node |
| verify(mDisp, times(4)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n1", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n2", 1)))); |
| } |
| |
| @Test |
| public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource() |
| throws IOException { |
| /** |
| * Queue structure is: |
| * |
| * <pre> |
| * root |
| * / \ |
| * a b |
| * </pre> |
| * |
| * Guaranteed resource of a/b are 50:50 |
| * Total cluster resource = 100 |
| * - A has 45 containers on two node, size of each container is 2, |
| * n1 has 23, n2 has 22 |
| * - B reserves 1 container with size = 9 at n1, |
| * |
| * So we can get 4 containers (total-resource = 8) preempted after |
| * preemption. Actual required is 3.5, but we need to preempt integer |
| * number of containers |
| */ |
| String labelsConfig = |
| "=100,true;"; |
| String nodesConfig = // n1 / n2 has no label |
| "n1= res=50;" + |
| "n2= res=50"; |
| String queuesConfig = |
| // guaranteed,max,used,pending,reserved |
| "root(=[100 100 99 9 9]);" + //root |
| "-a(=[50 100 90 0]);" + // a |
| "-b(=[50 100 9 9 9])"; // b |
| String appsConfig= |
| //queueName\t(priority,resource,host,expression,#repeat,reserved) |
| "a\t" // app1 in a |
| + "(1,2,n1,,24,false)" // 48 in n1 |
| + "(1,2,n2,,23,false);" + // 46 in n2 |
| "b\t" // app2 in b |
| + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 |
| |
| buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); |
| policy.editSchedule(); |
| |
| // Total 4 preempted from app1 at n1, don't preempt container from other |
| // app/node |
| verify(mDisp, times(4)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n1", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n2", 1)))); |
| } |
| |
| @Test |
| public void testPreemptionForReservedContainerRespectAvailableResources() |
| throws IOException { |
| /** |
| * Queue structure is: |
| * |
| * <pre> |
| * root |
| * / \ |
| * a b |
| * </pre> |
| * |
| * Guaranteed resource of a/b are 50:50 |
| * Total cluster resource = 100, 4 nodes, 25 on each node |
| * - A has 10 containers on every node, size of container is 2 |
| * - B reserves 1 container with size = 9 at n1, |
| * |
| * So even if we cannot allocate container for B now, no preemption should |
| * happen since there're plenty of available resources. |
| */ |
| String labelsConfig = |
| "=100,true;"; |
| String nodesConfig = |
| "n1= res=25;" + |
| "n2= res=25;" + |
| "n3= res=25;" + |
| "n4= res=25;"; |
| String queuesConfig = |
| // guaranteed,max,used,pending,reserved |
| "root(=[100 100 89 9 9]);" + //root |
| "-a(=[50 100 80 0]);" + // a |
| "-b(=[50 100 9 9 9])"; // b |
| String appsConfig= |
| //queueName\t(priority,resource,host,expression,#repeat,reserved) |
| "a\t" // app1 in a |
| + "(1,2,n1,,10,false)" // 10 in n1 |
| + "(1,2,n2,,10,false)" // 10 in n2 |
| + "(1,2,n3,,10,false)" // 10 in n3 |
| + "(1,2,n4,,10,false);" + // 10 in n4 |
| "b\t" // app2 in b |
| + "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1 |
| |
| buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); |
| policy.editSchedule(); |
| |
| // No preemption should happen |
| verify(mDisp, times(0)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n1", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n2", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n3", 1)))); |
| verify(mDisp, times(0)).handle(argThat( |
| new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", |
| NodeId.newInstance("n4", 1)))); |
| } |
| } |