blob: 10359e44458867adca45435d356847ba946b177a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple test class for the {@link LocalityMulticastAMRMProxyPolicy}.
*/
public class TestLocalityMulticastAMRMProxyPolicy
extends BaseFederationPoliciesTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestLocalityMulticastAMRMProxyPolicy.class);
@Before
public void setUp() throws Exception {
setPolicy(new TestableLocalityMulticastAMRMProxyPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// Six sub-clusters with one inactive and one disabled
for (int i = 0; i < 6; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
// sub-cluster 3 is not active
if (i != 3) {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
float weight = 1 / 10f;
routerWeights.put(sc, weight);
amrmWeights.put(sc, weight);
// sub-cluster 4 is "disabled" in the weights
if (i == 4) {
routerWeights.put(sc, 0f);
amrmWeights.put(sc, 0f);
}
}
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
getPolicyInfo().setHeadroomAlpha(0.5f);
setHomeSubCluster(SubClusterId.newInstance("homesubcluster"));
}
@Test
public void testReinitilialize() throws YarnException {
initializePolicy();
}
private void initializePolicy() throws YarnException {
initializePolicy(new YarnConfiguration());
}
private void initializePolicy(Configuration conf) throws YarnException {
setFederationPolicyContext(new FederationPolicyInitializationContext());
SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
getFederationPolicyContext().setFederationSubclusterResolver(resolver);
ByteBuffer buf = getPolicyInfo().toByteBuffer();
getFederationPolicyContext().setSubClusterPolicyConfiguration(
SubClusterPolicyConfiguration.newInstance("queue1",
getPolicy().getClass().getCanonicalName(), buf));
getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
getActiveSubclusters(), conf);
}
@Test(expected = FederationPolicyInitializationException.class)
public void testNullWeights() throws Exception {
getPolicyInfo().setAMRMPolicyWeights(null);
initializePolicy();
fail();
}
@Test(expected = FederationPolicyInitializationException.class)
public void testEmptyWeights() throws Exception {
getPolicyInfo()
.setAMRMPolicyWeights(new HashMap<SubClusterIdInfo, Float>());
initializePolicy();
fail();
}
@Test
public void testSplitBasedOnHeadroom() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy();
List<ResourceRequest> resourceRequests = createSimpleRequest();
prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
// pretty print requests
LOG.info("Initial headroom");
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
/*
* based on headroom, we expect 75 containers to got to subcluster0 (60) and
* subcluster2 (15) according to the advertised headroom (40 and 10), no
* containers for sublcuster1 as it advertise zero headroom, and 25 to
* subcluster5 which has unknown headroom, and so it gets 1/4th of the load
*/
checkExpectedAllocation(response, "subcluster0", 1, 60);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 15);
checkExpectedAllocation(response, "subcluster5", 1, 25);
checkTotalContainerAllocation(response, 100);
// notify a change in headroom and try again
AllocateResponse ar = getAllocateResponseWithTargetHeadroom(40);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
LOG.info("After headroom update");
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
/*
* we simulated a change in headroom for subcluster2, which will now have
* the same headroom of subcluster0, so each 37.5, note that the odd one
* will be assigned to either one of the two subclusters
*/
checkExpectedAllocation(response, "subcluster0", 1, 37);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 37);
checkExpectedAllocation(response, "subcluster5", 1, 25);
checkTotalContainerAllocation(response, 100);
}
@Test(timeout = 5000)
public void testStressPolicy() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy();
addHomeSubClusterAsActive();
int numRR = 1000;
List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
prepPolicyWithHeadroom(true);
int numIterations = 1000;
long tstart = System.currentTimeMillis();
for (int i = 0; i < numIterations; i++) {
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
validateSplit(response, resourceRequests);
}
long tend = System.currentTimeMillis();
LOG.info("Performed " + numIterations + " policy invocations (and "
+ "validations) in " + (tend - tstart) + "ms");
}
@Test
public void testFWDAllZeroANY() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(0.5f);
initializePolicy();
List<ResourceRequest> resourceRequests = createZeroSizedANYRequest();
// this receives responses from sc0,sc1,sc2
prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
// we expect all three to appear for a zero-sized ANY
// pretty print requests
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
// we expect the zero size request to be sent to the first 3 rm (due to
// the fact that we received responses only from these 3 sublcusters)
checkExpectedAllocation(response, "subcluster0", 1, 0);
checkExpectedAllocation(response, "subcluster1", 1, 0);
checkExpectedAllocation(response, "subcluster2", 1, 0);
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
checkExpectedAllocation(response, "subcluster5", -1, -1);
checkTotalContainerAllocation(response, 0);
}
@Test
public void testSplitBasedOnHeadroomAndWeights() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 50% headroom based and 50% weight based
getPolicyInfo().setHeadroomAlpha(0.5f);
initializePolicy();
List<ResourceRequest> resourceRequests = createSimpleRequest();
prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
// pretty print requests
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
// in this case the headroom allocates 50 containers, while weights allocate
// the rest. due to weights we have 12.5 containers for each
// sublcuster, the rest is due to headroom.
checkExpectedAllocation(response, "subcluster0", 1, 42); // 30 + 12.5
checkExpectedAllocation(response, "subcluster1", 1, 12); // 0 + 12.5
checkExpectedAllocation(response, "subcluster2", 1, 20); // 7.5 + 12.5
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
checkExpectedAllocation(response, "subcluster5", 1, 25); // 12.5 + 12.5
checkTotalContainerAllocation(response, 100);
}
private void prepPolicyWithHeadroom(boolean setSubCluster0)
throws YarnException {
AllocateResponse ar = getAllocateResponseWithTargetHeadroom(40);
if (setSubCluster0) {
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
}
ar = getAllocateResponseWithTargetHeadroom(0);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar);
ar = getAllocateResponseWithTargetHeadroom(10);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
}
private AllocateResponse getAllocateResponseWithTargetHeadroom(
int numContainers) {
return AllocateResponse.newInstance(0, null, null,
Collections.<NodeReport> emptyList(),
Resource.newInstance(numContainers * 1024, numContainers), null, 10,
null, Collections.<NMToken> emptyList());
}
/**
* modify default initialization to include a "homesubcluster" which we will
* use as the default for when nodes or racks are unknown.
*/
private void addHomeSubClusterAsActive() {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(getHomeSubCluster());
getActiveSubclusters().put(getHomeSubCluster(), sci);
SubClusterIdInfo sc = new SubClusterIdInfo(getHomeSubCluster().getId());
getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f);
getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f);
}
@Test
public void testSplitAllocateRequest() throws Exception {
// Test a complex List<ResourceRequest> is split correctly
initializePolicy();
addHomeSubClusterAsActive();
FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
getActiveSubclusters(), new Configuration());
List<ResourceRequest> resourceRequests = createComplexRequest();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
validateSplit(response, resourceRequests);
prettyPrintRequests(response);
// we expect 7 entries for home subcluster (2 for request-id 4, 3 for
// request-id 5, and a part of the broadcast of request-id 2
checkExpectedAllocation(response, getHomeSubCluster().getId(), 7, 29);
// for subcluster0 we expect 10 entries, 3 from request-id 0, and 3 from
// request-id 3, 3 entries from request-id 5, as well as part of the
// request-id 2 broadast
checkExpectedAllocation(response, "subcluster0", 10, 32);
// we expect 5 entries for subcluster1 (4 from request-id 1, and part
// of the broadcast of request-id 2
checkExpectedAllocation(response, "subcluster1", 5, 26);
// sub-cluster 2 should contain 3 entries from request-id 1 and 1 from the
// broadcast of request-id 2, and no request-id 0
checkExpectedAllocation(response, "subcluster2", 4, 23);
// subcluster id 3, 4 should not appear (due to weights or active/inactive)
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
// subcluster5 should get only part of the request-id 2 broadcast
checkExpectedAllocation(response, "subcluster5", 1, 20);
// Check the total number of container asks in all RR
checkTotalContainerAllocation(response, 130);
// check that the allocations that show up are what expected
for (ResourceRequest rr : response.get(getHomeSubCluster())) {
Assert.assertTrue(
rr.getAllocationRequestId() == 2L || rr.getAllocationRequestId() == 4L
|| rr.getAllocationRequestId() == 5L);
}
List<ResourceRequest> rrs =
response.get(SubClusterId.newInstance("subcluster0"));
for (ResourceRequest rr : rrs) {
Assert.assertTrue(rr.getAllocationRequestId() != 1L);
Assert.assertTrue(rr.getAllocationRequestId() != 4L);
}
for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster1"))) {
Assert.assertTrue(rr.getAllocationRequestId() == 1L
|| rr.getAllocationRequestId() == 2L);
}
for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster2"))) {
Assert.assertTrue(rr.getAllocationRequestId() == 1L
|| rr.getAllocationRequestId() == 2L);
}
for (ResourceRequest rr : response
.get(SubClusterId.newInstance("subcluster5"))) {
Assert.assertTrue(rr.getAllocationRequestId() == 2);
Assert.assertTrue(rr.getRelaxLocality());
}
}
// check that the number of containers in the first ResourceRequest in
// response for this sub-cluster matches expectations. -1 indicate the
// response should be null
private void checkExpectedAllocation(
Map<SubClusterId, List<ResourceRequest>> response, String subCluster,
long totResourceRequests, long minimumTotalContainers) {
if (minimumTotalContainers == -1) {
Assert.assertNull(response.get(SubClusterId.newInstance(subCluster)));
} else {
SubClusterId sc = SubClusterId.newInstance(subCluster);
Assert.assertEquals(totResourceRequests, response.get(sc).size());
long actualContCount = 0;
for (ResourceRequest rr : response.get(sc)) {
actualContCount += rr.getNumContainers();
}
Assert.assertTrue(
"Actual count " + actualContCount + " should be at least "
+ minimumTotalContainers,
minimumTotalContainers <= actualContCount);
}
}
private void checkTotalContainerAllocation(
Map<SubClusterId, List<ResourceRequest>> response, long totalContainers) {
long actualContCount = 0;
for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
.entrySet()) {
for (ResourceRequest rr : entry.getValue()) {
actualContCount += rr.getNumContainers();
}
}
Assert.assertEquals(totalContainers, actualContCount);
}
private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split,
List<ResourceRequest> original) throws YarnException {
SubClusterResolver resolver =
getFederationPolicyContext().getFederationSubclusterResolver();
// Apply general validation rules
int numUsedSubclusters = split.size();
Set<Long> originalIds = new HashSet<>();
Set<Long> splitIds = new HashSet<>();
int originalContainers = 0;
for (ResourceRequest rr : original) {
originalContainers += rr.getNumContainers();
originalIds.add(rr.getAllocationRequestId());
}
int splitContainers = 0;
for (Map.Entry<SubClusterId, List<ResourceRequest>> rrs : split
.entrySet()) {
for (ResourceRequest rr : rrs.getValue()) {
splitContainers += rr.getNumContainers();
splitIds.add(rr.getAllocationRequestId());
// check node-local asks are sent to right RM (only)
SubClusterId fid = null;
try {
fid = resolver.getSubClusterForNode(rr.getResourceName());
} catch (YarnException e) {
// ignore code will handle
}
if (!rrs.getKey().equals(getHomeSubCluster()) && fid != null
&& !fid.equals(rrs.getKey())) {
Assert.fail("A node-local (or resolvable rack-local) RR should not "
+ "be send to an RM other than what it resolves to.");
}
}
}
// check we are not inventing Allocation Ids
Assert.assertEquals(originalIds, splitIds);
// check we are not exceedingly replicating the container asks among
// RMs (a little is allowed due to rounding of fractional splits)
Assert.assertTrue(
" Containers requested (" + splitContainers + ") should "
+ "not exceed the original count of containers ("
+ originalContainers + ") by more than the number of subclusters ("
+ numUsedSubclusters + ")",
originalContainers + numUsedSubclusters >= splitContainers);
// Test target Ids
for (SubClusterId targetId : split.keySet()) {
Assert.assertTrue(
"Target subcluster " + targetId + " should be in the active set",
getActiveSubclusters().containsKey(targetId));
Assert.assertTrue(
"Target subclusters (" + targetId + ") should have weight >0 in "
+ "the policy ",
getPolicyInfo().getRouterPolicyWeights()
.get(new SubClusterIdInfo(targetId)) > 0);
}
}
private void prettyPrintRequests(
Map<SubClusterId, List<ResourceRequest>> response) {
for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
.entrySet()) {
String str = "";
for (ResourceRequest rr : entry.getValue()) {
str += " [id:" + rr.getAllocationRequestId() + " loc:"
+ rr.getResourceName() + " numCont:" + rr.getNumContainers()
+ "], ";
}
LOG.info(entry.getKey() + " --> " + str);
}
}
private List<ResourceRequest> createLargeRandomList(int numRR)
throws Exception {
List<ResourceRequest> out = new ArrayList<>();
Random rand = new Random(1);
DefaultSubClusterResolverImpl resolver =
(DefaultSubClusterResolverImpl) getFederationPolicyContext()
.getFederationSubclusterResolver();
List<String> nodes =
new ArrayList<>(resolver.getNodeToSubCluster().keySet());
for (int i = 0; i < numRR; i++) {
String nodeName = nodes.get(rand.nextInt(nodes.size()));
long allocationId = (long) rand.nextInt(20);
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(allocationId,
nodeName, 1024, 1, 1, rand.nextInt(100), null, rand.nextBoolean()));
}
return out;
}
private List<ResourceRequest> createSimpleRequest() throws Exception {
List<ResourceRequest> out = new ArrayList<>();
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
return out;
}
private List<ResourceRequest> createZeroSizedANYRequest() throws Exception {
List<ResourceRequest> out = new ArrayList<>();
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 0, null, true));
return out;
}
private List<ResourceRequest> createComplexRequest() throws Exception {
List<ResourceRequest> out = new ArrayList<>();
// create a single container request in sc0
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
// create a single container request with 3 alternative hosts across sc1,sc2
// where we want 2 containers in sc1 and 1 in sc2
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster1-rack1-host1", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster1-rack1-host2", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster2-rack3-host3", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster1-rack1", 1024, 1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
"subcluster2-rack3", 1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
ResourceRequest.ANY, 1024, 1, 1, 3, null, false));
// create a non-local ANY request that can span anything
out.add(FederationPoliciesTestUtil.createResourceRequest(2L,
ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
// create a single container request in sc0 with relaxed locality
out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
"subcluster0-rack0-host0", 1024, 1, 1, 1, null, true));
out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
"subcluster0-rack0", 1024, 1, 1, 1, null, true));
out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
ResourceRequest.ANY, 1024, 1, 1, 1, null, true));
// create a request of an unknown node/rack and expect this to show up
// in homesubcluster
out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownNode",
1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownRack",
1024, 1, 1, 1, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(4L,
ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
// create a request of two hosts, an unknown node and a known node, both in
// a known rack, and expect the unknown node to show up in homesubcluster
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
"subcluster0-rack0-host0", 1024, 1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
"subcluster0-rack0", 1024, 1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "node4", 1024,
1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "rack2", 1024,
1, 1, 2, null, false));
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
ResourceRequest.ANY, 1024, 1, 1, 4, null, false));
return out;
}
public String printList(ArrayList<Integer> list) {
StringBuilder sb = new StringBuilder();
for (Integer entry : list) {
sb.append(entry + ", ");
}
return sb.toString();
}
@Test
public void testIntegerAssignment() throws YarnException {
float[] weights =
new float[] {0, 0.1f, 0.2f, 0.2f, -0.1f, 0.1f, 0.2f, 0.1f, 0.1f};
int[] expectedMin = new int[] {0, 1, 3, 3, 0, 1, 3, 1, 1};
ArrayList<Float> weightsList = new ArrayList<>();
for (float weight : weights) {
weightsList.add(weight);
}
LocalityMulticastAMRMProxyPolicy policy =
(LocalityMulticastAMRMProxyPolicy) getPolicy();
for (int i = 0; i < 500000; i++) {
ArrayList<Integer> allocations =
policy.computeIntegerAssignment(19, weightsList);
int sum = 0;
for (int j = 0; j < weights.length; j++) {
sum += allocations.get(j);
if (allocations.get(j) < expectedMin[j]) {
Assert.fail(allocations.get(j) + " at index " + j
+ " should be at least " + expectedMin[j] + ". Allocation array: "
+ printList(allocations));
}
}
Assert.assertEquals(
"Expect sum to be 19 in array: " + printList(allocations), 19, sum);
}
}
@Test
public void testCancelWithLocalizedResource() throws YarnException {
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy();
List<ResourceRequest> resourceRequests = new ArrayList<>();
// Initialize the headroom map
prepPolicyWithHeadroom(true);
// Cancel at ANY level only
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0", 1024, 1, 1, 1, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 0, null, false));
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
checkExpectedAllocation(response, "subcluster0", 3, 1);
checkExpectedAllocation(response, "subcluster1", 1, 0);
checkExpectedAllocation(response, "subcluster2", 1, 0);
checkExpectedAllocation(response, "subcluster3", -1, -1);
checkExpectedAllocation(response, "subcluster4", -1, -1);
checkExpectedAllocation(response, "subcluster5", -1, -1);
resourceRequests.clear();
// Cancel at node level only
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0-host0", 1024, 1, 1, 0, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
"subcluster0-rack0", 1024, 1, 1, 0, null, false));
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
ResourceRequest.ANY, 1024, 1, 1, 100, null, false));
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
/*
* Since node request is a cancel, it should not be considered associated
* with localized requests. Based on headroom, we expect 75 containers to
* got to subcluster0 (60) and subcluster2 (15) according to the advertised
* headroom (40 and 10), no containers for sublcuster1 as it advertise zero
* headroom, and 25 to subcluster5 which has unknown headroom, and so it
* gets 1/4th of the load
*/
checkExpectedAllocation(response, "subcluster0", 3, 60);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 15);
checkExpectedAllocation(response, "subcluster5", 1, 25);
checkTotalContainerAllocation(response, 100);
}
@Test
public void testSubClusterExpiry() throws Exception {
// Tests how the headroom info are used to split based on the capacity
// each RM claims to give us.
// Configure policy to be 100% headroom based
getPolicyInfo().setHeadroomAlpha(1.0f);
YarnConfiguration conf = new YarnConfiguration();
// Set expiry to 500ms
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
500);
initializePolicy(conf);
List<ResourceRequest> resourceRequests = createSimpleRequest();
prepPolicyWithHeadroom(true);
// For first time, no sub-cluster expired
Set<SubClusterId> expiredSCList = new HashSet<>();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests, expiredSCList);
// pretty print requests
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
/*
* based on headroom, we expect 75 containers to got to subcluster0 (60) and
* subcluster2 (15) according to the advertised headroom (40 and 10), no
* containers for sublcuster1 as it advertise zero headroom, and 25 to
* subcluster5 which has unknown headroom, and so it gets 1/4th of the load
*/
checkExpectedAllocation(response, "subcluster0", 1, 60);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 15);
checkExpectedAllocation(response, "subcluster5", 1, 25);
checkTotalContainerAllocation(response, 100);
Thread.sleep(800);
// For the second time, sc0 and sc5 expired
expiredSCList.add(SubClusterId.newInstance("subcluster0"));
expiredSCList.add(SubClusterId.newInstance("subcluster5"));
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests, expiredSCList);
// pretty print requests
prettyPrintRequests(response);
validateSplit(response, resourceRequests);
checkExpectedAllocation(response, "subcluster0", 1, -1);
checkExpectedAllocation(response, "subcluster1", 1, -1);
checkExpectedAllocation(response, "subcluster2", 1, 100);
checkExpectedAllocation(response, "subcluster5", 1, -1);
checkTotalContainerAllocation(response, 100);
}
/**
* A testable version of LocalityMulticastAMRMProxyPolicy that
* deterministically falls back to home sub-cluster for unresolved requests.
*/
private class TestableLocalityMulticastAMRMProxyPolicy
extends LocalityMulticastAMRMProxyPolicy {
@Override
protected SubClusterId getSubClusterForUnResolvedRequest(
AllocationBookkeeper bookkeeper, long allocationId) {
SubClusterId originalResult =
super.getSubClusterForUnResolvedRequest(bookkeeper, allocationId);
Map<SubClusterId, SubClusterInfo> activeClusters = null;
try {
activeClusters = getActiveSubclusters();
} catch (YarnException e) {
throw new RuntimeException(e);
}
// The randomly selected sub-cluster should at least be active
Assert.assertTrue(activeClusters.containsKey(originalResult));
// Alwasy use home sub-cluster so that unit test is deterministic
return getHomeSubCluster();
}
}
}