blob: 3f6c9d9577854f939122be3b59524bce2a4d7a7a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
* is properly considered for allocation.
*/
public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
@Before
public void setUp() throws Exception {
setPolicy(new LoadBasedRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
long now = Time.now();
// simulate 20 active subclusters
for (int i = 0; i < 20; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i));
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
float weight = getRand().nextInt(2);
if (i == 5) {
weight = 1.0f;
}
// 5% chance we omit one of the weights
if (i <= 5 || getRand().nextFloat() > 0.05f) {
routerWeights.put(sc, weight);
amrmWeights.put(sc, weight);
}
}
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
// initialize policy with context
setupContext();
}
public String generateClusterMetricsInfo(int id) {
long mem = 1024 * getRand().nextInt(277 * 100 - 1);
// plant a best cluster
if (id == 5) {
mem = 1024 * 277 * 100;
}
String clusterMetrics =
"{\"clusterMetrics\":{\"appsSubmitted\":65," + "\"appsCompleted\":64,"
+ "\"appsPending\":0,\"appsRunning\":0,\"appsFailed\":0,"
+ "\"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + mem + ","
+ "\"allocatedMB\":0,\"reservedVirtualCores\":0,"
+ "\"availableVirtualCores\":2216,\"allocatedVirtualCores\":0,"
+ "\"containersAllocated\":0,\"containersReserved\":0,"
+ "\"containersPending\":0,\"totalMB\":28364800,"
+ "\"totalVirtualCores\":2216,\"totalNodes\":278,\"lostNodes\":1,"
+ "\"unhealthyNodes\":0,\"decommissionedNodes\":0,"
+ "\"rebootedNodes\":0,\"activeNodes\":277}}\n";
return clusterMetrics;
}
@Test
public void testLoadIsRespected() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
.getHomeSubcluster(getApplicationSubmissionContext(), null);
// check the "planted" best cluster is chosen
Assert.assertEquals("sc05", chosen.getId());
}
@Test
public void testIfNoSubclustersWithWeightOne() throws Exception {
setPolicy(new LoadBasedRouterPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// update subcluster with weight 0
SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", 0));
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), null, null, null, null, -1, SubClusterState.SC_RUNNING, -1,
generateClusterMetricsInfo(0));
getActiveSubclusters().clear();
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
routerWeights.put(sc, 0.0f);
amrmWeights.put(sc, 0.0f);
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
ConfigurableFederationPolicy policy = getPolicy();
FederationPoliciesTestUtil.initializePolicyContext(policy,
getPolicyInfo(), getActiveSubclusters());
LambdaTestUtils.intercept(YarnException.class, "Zero Active Subcluster with weight 1.",
() -> ((FederationRouterPolicy) policy).
getHomeSubcluster(getApplicationSubmissionContext(), null));
}
}