blob: 2bad8b7447b4f12efc05d7783ee3c3b1c3035b27 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestAbsoluteResourceWithAutoQueue
extends TestCapacitySchedulerAutoCreatedQueueBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestAbsoluteResourceWithAutoQueue.class);
private static final String QUEUEA = "queueA";
private static final String QUEUEB = "queueB";
private static final String QUEUEC = "queueC";
private static final String QUEUED = "queueD";
private static final String QUEUEA_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUEA;
private static final String QUEUEB_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUEB;
private static final String QUEUEC_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUEC;
private static final String QUEUED_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUED;
private static final Resource QUEUE_A_MINRES =
Resource.newInstance(100 * GB, 10);
private static final Resource QUEUE_A_MAXRES =
Resource.newInstance(200 * GB, 30);
private static final Resource QUEUE_B_MINRES =
Resource.newInstance(50 * GB, 10);
private static final Resource QUEUE_B_MAXRES =
Resource.newInstance(150 * GB, 30);
private static final Resource QUEUE_C_MINRES =
Resource.newInstance(25 * GB, 5);
private static final Resource QUEUE_C_MAXRES =
Resource.newInstance(150 * GB, 20);
private static final Resource QUEUE_D_MINRES =
Resource.newInstance(25 * GB, 5);
private static final Resource QUEUE_D_MAXRES =
Resource.newInstance(150 * GB, 20);
@Before
public void setUp() throws Exception {
accessibleNodeLabelsOnC.add(NO_LABEL);
}
private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration(
CapacitySchedulerConfiguration csConf) {
// Update min/max resource to queueA/B/C
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MINRES);
csConf.setMinimumResourceRequirement("", QUEUED_FULL, QUEUE_D_MINRES);
csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUED_FULL, QUEUE_D_MAXRES);
return csConf;
}
public static CapacitySchedulerConfiguration setupQueueConfiguration(
CapacitySchedulerConfiguration conf) {
return conf;
}
private CapacitySchedulerConfiguration setupSimpleQueueConfiguration(
boolean isCapacityNeeded) {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { QUEUEA, QUEUEB, QUEUEC, QUEUED });
// Set default capacities like normal configuration.
if (isCapacityNeeded) {
csConf.setCapacity(QUEUEA_FULL, 50f);
csConf.setCapacity(QUEUEB_FULL, 25f);
csConf.setCapacity(QUEUEC_FULL, 25f);
csConf.setCapacity(QUEUED_FULL, 25f);
}
csConf.setAutoCreateChildQueueEnabled(QUEUEC_FULL, true);
csConf.setAutoCreatedLeafQueueTemplateCapacityByLabel(QUEUEC_FULL, "",
QUEUE_C_MINRES);
csConf.setAutoCreatedLeafQueueTemplateMaxCapacity(QUEUEC_FULL, "",
QUEUE_C_MAXRES);
csConf.setAutoCreateChildQueueEnabled(QUEUED_FULL, true);
// Setup leaf queue template configs
csConf.setAutoCreatedLeafQueueTemplateCapacityByLabel(QUEUED_FULL, "",
Resource.newInstance(10 * GB, 2));
csConf.setAutoCreatedLeafQueueTemplateMaxCapacity(QUEUED_FULL, "",
QUEUE_D_MAXRES);
return csConf;
}
// TODO: Wangda: I think this test case is not correct, Sunil could help look
// into details.
@Test(timeout = 20000)
public void testAutoCreateLeafQueueCreation() throws Exception {
try {
CapacitySchedulerConfiguration csConf =
setupSimpleQueueConfiguration(false);
setupMinMaxResourceConfiguration(csConf);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
csConf.setOverrideWithQueueMappings(true);
mockRM = new MockRM(csConf);
cs = (CapacityScheduler) mockRM.getResourceScheduler();
mockRM.start();
cs.start();
// Add few nodes
mockRM.registerNode("127.0.0.1:1234", 250 * GB, 40);
setupGroupQueueMappings(QUEUED, cs.getConfiguration(), "%user");
cs.reinitialize(cs.getConfiguration(), mockRM.getRMContext());
submitApp(mockRM, cs.getQueue(QUEUED), TEST_GROUPUSER, TEST_GROUPUSER, 1,
1);
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(QUEUED);
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue, 0.4f,
0.04f, 1f, 0.6f);
validateCapacitiesByLabel((ManagedParentQueue) parentQueue,
(AutoCreatedLeafQueue) autoCreatedLeafQueue, NO_LABEL);
Map<String, Float> expectedChildQueueAbsCapacity =
new HashMap<String, Float>() {
{
put(NO_LABEL, 0.04f);
}
};
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER,
expectedChildQueueAbsCapacity, new HashSet<String>() {
{
add(NO_LABEL);
}
});
validateUserAndAppLimits(autoCreatedLeafQueue, 400, 400);
assertTrue(autoCreatedLeafQueue
.getOrderingPolicy() instanceof FifoOrderingPolicy);
ApplicationId user1AppId = submitApp(mockRM, cs.getQueue(QUEUED),
TEST_GROUPUSER1, TEST_GROUPUSER1, 2, 1);
AutoCreatedLeafQueue autoCreatedLeafQueue1 =
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER1);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue1, 0.4f,
0.04f, 1f, 0.6f);
validateCapacitiesByLabel((ManagedParentQueue) parentQueue,
(AutoCreatedLeafQueue) autoCreatedLeafQueue1, NO_LABEL);
assertEquals(parentQueue, autoCreatedLeafQueue1.getParent());
Map<String, Float> expectedChildQueueAbsCapacity1 =
new HashMap<String, Float>() {
{
put(NO_LABEL, 0.08f);
}
};
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER1,
expectedChildQueueAbsCapacity1, new HashSet<String>() {
{
add(NO_LABEL);
}
});
submitApp(mockRM, cs.getQueue(QUEUED), TEST_GROUPUSER2, TEST_GROUPUSER2,
3, 1);
final CSQueue autoCreatedLeafQueue2 = cs.getQueue(TEST_GROUPUSER2);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue2, 0.0f,
0.0f, 1f, 0.6f);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue1, 0.4f,
0.04f, 1f, 0.6f);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue, 0.4f,
0.04f, 1f, 0.6f);
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(0.08f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
cs.killAllAppsInQueue(TEST_GROUPUSER1);
mockRM.waitForState(user1AppId, RMAppState.KILLED);
List<QueueManagementChange> queueManagementChanges =
autoCreatedQueueManagementPolicy.computeQueueManagementChanges();
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue;
managedParentQueue
.validateAndApplyQueueManagementChanges(queueManagementChanges);
validateDeactivatedQueueEntitlement(parentQueue, TEST_GROUPUSER1,
expectedChildQueueAbsCapacity1, queueManagementChanges);
Set<String> expectedNodeLabelsUpdated = new HashSet<>();
expectedNodeLabelsUpdated.add(NO_LABEL);
validateActivatedQueueEntitlement(parentQueue, TEST_GROUPUSER2,
expectedChildQueueAbsCapacity1, queueManagementChanges,
expectedNodeLabelsUpdated);
} finally {
cleanupQueue(TEST_GROUPUSER);
cleanupQueue(TEST_GROUPUSER1);
cleanupQueue(TEST_GROUPUSER2);
}
}
@Test(expected = Exception.class)
public void testValidateLeafQueueTemplateConfigurations() {
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(true);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mockRM = new MockRM(csConf);
fail("Exception should be thrown as leaf queue template configuration is "
+ "not same as Parent configuration");
}
@Test(timeout = 20000)
public void testApplicationRunningWithDRF() throws Exception {
CapacitySchedulerConfiguration csConf =
setupSimpleQueueConfiguration(false);
setupMinMaxResourceConfiguration(csConf);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Validate Leaf Queue Template in Absolute Resource with DRF
csConf.setResourceComparator(DominantResourceCalculator.class);
setupGroupQueueMappings(QUEUED, csConf, "%user");
mockRM = new MockRM(csConf);
mockRM.start();
MockNM nm1 = mockRM.registerNode("127.0.0.1:1234", 250 * GB, 40);
// Submit a Application and validate if it is moving to RUNNING state
RMApp app1 = MockRMAppSubmitter.submit(mockRM,
MockRMAppSubmissionData.Builder.createWithMemory(1024, mockRM)
.withAppName("app1")
.withUser(TEST_GROUPUSER)
.withAcls(null)
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, mockRM, nm1);
cs = (CapacityScheduler) mockRM.getResourceScheduler();
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
assertNotNull("Auto Creation of Queue failed", autoCreatedLeafQueue);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(QUEUED);
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
}
}