blob: 0837fd7205a7b7312d73e1c3a4847e378520c703 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assume;
import org.junit.Test;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import static org.apache.hadoop.yarn.util.resource.TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCapacitySchedulerPerf {
private final int GB = 1024;
private String getResourceName(int idx) {
return "resource-" + idx;
}
private void testUserLimitThroughputWithNumberOfResourceTypes(
int numOfResourceTypes)
throws Exception {
if (numOfResourceTypes > 2) {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
for (int i = 2; i < numOfResourceTypes; i++) {
String resourceName = getResourceName(i);
riMap.put(resourceName, ResourceInformation
.newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0,
Integer.MAX_VALUE));
}
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
// Since this is more of a performance unit test, only run if
// RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
Assume.assumeTrue(Boolean.valueOf(
System.getProperty("RunCapacitySchedulerPerfTests")));
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
100.0f);
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource types
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qb = (LeafQueue)cs.getQueue("default");
// For now make user limit large so we can activate all applications
qb.setUserLimitFactor((float)100.0);
qb.setupConfigurableCapacities();
SchedulerEvent addAppEvent;
SchedulerEvent addAttemptEvent;
Container container = mock(Container.class);
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
final int appCount = 100;
ApplicationId[] appids = new ApplicationId[appCount];
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
RMAppImpl[] apps = new RMAppImpl[appCount];
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
for (int i=0; i<appCount; i++) {
appids[i] = BuilderUtils.newApplicationId(100, i);
appAttemptIds[i] =
BuilderUtils.newApplicationAttemptId(appids[i], 1);
attemptMetrics[i] =
new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
apps[i] = mock(RMAppImpl.class);
when(apps[i].getApplicationId()).thenReturn(appids[i]);
attempts[i] = mock(RMAppAttemptImpl.class);
when(attempts[i].getMasterContainer()).thenReturn(container);
when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
addAppEvent =
new AppAddedSchedulerEvent(appids[i], "default", "user1");
cs.handle(addAppEvent);
addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
cs.handle(addAttemptEvent);
}
// add nodes to cluster, so cluster has 20GB and 20 vcores
Resource nodeResource = Resource.newInstance(10 * GB, 10);
if (numOfResourceTypes > 2) {
for (int i = 2; i < numOfResourceTypes; i++) {
nodeResource.setResourceValue(getResourceName(i), 10);
}
}
RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
cs.handle(new NodeAddedSchedulerEvent(node2));
Priority u0Priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
for (int i=0;i<appCount;i++) {
fiCaApps[i] =
cs.getSchedulerApplications().get(apps[i].getApplicationId())
.getCurrentAppAttempt();
ResourceRequest resourceRequest = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, u0Priority, recordFactory);
if (numOfResourceTypes > 2) {
for (int j = 2; j < numOfResourceTypes; j++) {
resourceRequest.getCapability().setResourceValue(getResourceName(j),
10);
}
}
// allocate container for app2 with 1GB memory and 1 vcore
fiCaApps[i].updateResourceRequests(
Collections.singletonList(resourceRequest));
}
// Now force everything to be over user limit
qb.setUserLimitFactor((float)0.0);
// Quiet the loggers while measuring throughput
for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
loggers.hasMoreElements(); ) {
Logger logger = (Logger) loggers.nextElement();
logger.setLevel(Level.WARN);
}
final int topn = 20;
final int iterations = 2000000;
final int printInterval = 20000;
final float numerator = 1000.0f * printInterval;
PriorityQueue<Long> queue = new PriorityQueue<>(topn,
Collections.reverseOrder());
long n = Time.monotonicNow();
long timespent = 0;
for (int i = 0; i < iterations; i+=2) {
if (i > 0 && i % printInterval == 0){
long ts = (Time.monotonicNow() - n);
if (queue.size() < topn) {
queue.offer(ts);
} else {
Long last = queue.peek();
if (last > ts) {
queue.poll();
queue.offer(ts);
}
}
System.out.println(i + " " + (numerator / ts));
n= Time.monotonicNow();
}
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
}
timespent=0;
int entries = queue.size();
while(queue.size() > 0){
long l = queue.poll();
timespent += l;
}
System.out.println(
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
+ ": " + numerator / (timespent / entries));
rm.stop();
}
@Test(timeout = 300000)
public void testUserLimitThroughputForTwoResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(2);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForThreeResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(3);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFourResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(4);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFiveResources() throws Exception {
testUserLimitThroughputWithNumberOfResourceTypes(5);
}
}