blob: 5af9bc3acdfef9057cbee4f3346ac2f7f7719cbb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Map;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
public class TestFSLeafQueue extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR,
TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath();
private Resource maxResource = Resources.createResource(1024 * 8);
private static final float MAX_AM_SHARE = 0.5f;
private static final String CUSTOM_RESOURCE = "test1";
@Before
public void setup() throws IOException {
conf = createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
}
@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
conf = null;
}
@Test
public void testUpdateDemand() {
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
String queueName = "root.queue1";
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
schedulable.setMaxShare(new ConfigurableResource(maxResource));
assertThat(schedulable.getMetrics().getMaxApps()).
isEqualTo(Integer.MAX_VALUE);
assertThat(schedulable.getMetrics().getSchedulingPolicy()).isEqualTo(
SchedulingPolicy.DEFAULT_POLICY.getName());
FSAppAttempt app = mock(FSAppAttempt.class);
Mockito.when(app.getDemand()).thenReturn(maxResource);
Mockito.when(app.getResourceUsage()).thenReturn(Resources.none());
schedulable.addApp(app, true);
schedulable.addApp(app, true);
schedulable.updateDemand();
assertTrue("Demand is greater than max allowed ",
Resources.equals(schedulable.getDemand(), maxResource));
}
@Test (timeout = 5000)
public void test() {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileWriter.create()
.queueMaxAMShareDefault(MAX_AM_SHARE)
.addQueue(new AllocationFileQueue.Builder("queueA").build())
.addQueue(new AllocationFileQueue.Builder("queueB").build())
.writeToFile(ALLOC_FILE);
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
for(FSQueue queue: scheduler.getQueueManager().getQueues()) {
assertThat(queue.getMetrics().getMaxApps()).isEqualTo(Integer.MAX_VALUE);
assertThat(queue.getMetrics().getSchedulingPolicy()).isEqualTo(
SchedulingPolicy.DEFAULT_POLICY.getName());
}
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
scheduler.update();
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
}
@Test
public void testConcurrentAccess() {
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
String queueName = "root.queue1";
final FSLeafQueue schedulable = scheduler.getQueueManager().
getLeafQueue(queueName, true);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
RMContext rmContext = resourceManager.getRMContext();
final FSAppAttempt app =
new FSAppAttempt(scheduler, applicationAttemptId, "user1",
schedulable, null, rmContext);
// this needs to be in sync with the number of runnables declared below
int testThreads = 2;
List<Runnable> runnables = new ArrayList<Runnable>();
// add applications to modify the list
runnables.add(new Runnable() {
@Override
public void run() {
for (int i=0; i < 500; i++) {
schedulable.addApp(app, true);
}
}
});
// iterate over the list a couple of times in a different thread
runnables.add(new Runnable() {
@Override
public void run() {
for (int i=0; i < 500; i++) {
schedulable.getResourceUsage();
}
}
});
final List<Throwable> exceptions = Collections.synchronizedList(
new ArrayList<Throwable>());
final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
testThreads);
try {
final CountDownLatch allExecutorThreadsReady =
new CountDownLatch(testThreads);
final CountDownLatch startBlocker = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(testThreads);
for (final Runnable submittedTestRunnable : runnables) {
threadPool.submit(new Runnable() {
public void run() {
allExecutorThreadsReady.countDown();
try {
startBlocker.await();
submittedTestRunnable.run();
} catch (final Throwable e) {
exceptions.add(e);
} finally {
allDone.countDown();
}
}
});
}
// wait until all threads are ready
allExecutorThreadsReady.await();
// start all test runners
startBlocker.countDown();
int testTimeout = 2;
assertTrue("Timeout waiting for more than " + testTimeout + " seconds",
allDone.await(testTimeout, TimeUnit.SECONDS));
} catch (InterruptedException ie) {
exceptions.add(ie);
} finally {
threadPool.shutdownNow();
}
assertTrue("Test failed with exception(s)" + exceptions,
exceptions.isEmpty());
}
@Test
public void testCanRunAppAMReturnsTrue() {
conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
ResourceUtils.resetResourceTypes(conf);
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
Resource maxShare = Resource.newInstance(1024 * 8, 4,
ImmutableMap.of(CUSTOM_RESOURCE, 10L));
// Add a node to increase available memory and vcores in scheduler's
// root queue metrics
addNodeToScheduler(Resource.newInstance(4096, 10,
ImmutableMap.of(CUSTOM_RESOURCE, 25L)));
FSLeafQueue queue = setupQueue(maxShare);
//Min(availableMemory, maxShareMemory (maxResourceOverridden))
// --> Min(4096, 8192) = 4096
//Min(availableVCores, maxShareVCores (maxResourceOverridden))
// --> Min(10, 4) = 4
//Min(available test1, maxShare test1 (maxResourceOverridden))
// --> Min(25, 10) = 10
//MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
// --> 2048 MB memory, 2 vcores, 5 test1
Resource expectedAMShare = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 5L));
Resource appAMResource = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 3L));
Map<String, Long> customResourceValues =
verifyQueueMetricsForCustomResources(queue);
boolean result = queue.canRunAppAM(appAMResource);
assertTrue("AM should have been allocated!", result);
verifyAMShare(queue, expectedAMShare, customResourceValues);
}
private FSLeafQueue setupQueue(Resource maxShare) {
String queueName = "root.queue1";
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
schedulable.setMaxShare(new ConfigurableResource(maxShare));
schedulable.setMaxAMShare(MAX_AM_SHARE);
return schedulable;
}
@Test
public void testCanRunAppAMReturnsFalse() {
conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
ResourceUtils.resetResourceTypes(conf);
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
Resource maxShare = Resource.newInstance(1024 * 8, 4,
ImmutableMap.of(CUSTOM_RESOURCE, 10L));
// Add a node to increase available memory and vcores in scheduler's
// root queue metrics
addNodeToScheduler(Resource.newInstance(4096, 10,
ImmutableMap.of(CUSTOM_RESOURCE, 25L)));
FSLeafQueue queue = setupQueue(maxShare);
//Min(availableMemory, maxShareMemory (maxResourceOverridden))
// --> Min(4096, 8192) = 4096
//Min(availableVCores, maxShareVCores (maxResourceOverridden))
// --> Min(10, 4) = 4
//Min(available test1, maxShare test1 (maxResourceOverridden))
// --> Min(25, 10) = 10
//MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
// --> 2048 MB memory, 2 vcores, 5 test1
Resource expectedAMShare = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 5L));
Resource appAMResource = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 6L));
Map<String, Long> customResourceValues =
verifyQueueMetricsForCustomResources(queue);
boolean result = queue.canRunAppAM(appAMResource);
assertFalse("AM should not have been allocated!", result);
verifyAMShare(queue, expectedAMShare, customResourceValues);
}
private void addNodeToScheduler(Resource node1Resource) {
RMNode node1 = MockNodes.newNodeInfo(0, node1Resource, 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
}
private void verifyAMShare(FSLeafQueue schedulable,
Resource expectedAMShare, Map<String, Long> customResourceValues) {
Resource actualAMShare = Resource.newInstance(
schedulable.getMetrics().getMaxAMShareMB(),
schedulable.getMetrics().getMaxAMShareVCores(), customResourceValues);
long customResourceValue =
actualAMShare.getResourceValue(CUSTOM_RESOURCE);
//make sure to verify custom resource value explicitly!
assertEquals(5L, customResourceValue);
assertEquals("AM share is not the expected!", expectedAMShare,
actualAMShare);
}
private Map<String, Long> verifyQueueMetricsForCustomResources(
FSLeafQueue schedulable) {
CustomResourceMetricValue maxAMShareCustomResources =
schedulable.getMetrics().getCustomResources().getMaxAMShare();
Map<String, Long> customResourceValues = maxAMShareCustomResources
.getValues();
assertNotNull("Queue metrics for custom resources should not be null!",
maxAMShareCustomResources);
assertNotNull("Queue metrics for custom resources resource values " +
"should not be null!", customResourceValues);
return customResourceValues;
}
}