| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.federation.policies.router; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; |
| import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; |
| import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; |
| import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; |
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; |
| import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /** |
| * Test class to validate the correctness of LocalityRouterPolicy. |
| */ |
| public class TestLocalityRouterPolicy extends TestWeightedRandomRouterPolicy { |
| |
| /* |
| * The MachineList for the default Resolver has the following nodes: |
| * |
| * node1<=>subcluster1 |
| * |
| * node2<=>subcluster2 |
| * |
| * noDE3<=>subcluster3 |
| * |
| * node4<=>subcluster3 |
| * |
| * subcluster0-rack0-host0<=>subcluster0 |
| * |
| * Subcluster1-RACK1-HOST1<=>subcluster1 |
| * |
| * SUBCLUSTER1-RACK1-HOST2<=>subcluster1 |
| * |
| * SubCluster2-RACK3-HOST3<=>subcluster2 |
| */ |
| |
| @Before |
| public void setUp() throws Exception { |
| setPolicy(new LocalityRouterPolicy()); |
| setPolicyInfo(new WeightedPolicyInfo()); |
| |
| configureWeights(4); |
| |
| // initialize policy with context |
| initializePolicy(new YarnConfiguration()); |
| } |
| |
| private void initializePolicy(Configuration conf) throws YarnException { |
| setFederationPolicyContext(new FederationPolicyInitializationContext()); |
| SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver(); |
| getFederationPolicyContext().setFederationSubclusterResolver(resolver); |
| ByteBuffer buf = getPolicyInfo().toByteBuffer(); |
| getFederationPolicyContext().setSubClusterPolicyConfiguration( |
| SubClusterPolicyConfiguration |
| .newInstance("queue1", getPolicy().getClass().getCanonicalName(), |
| buf)); |
| getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster()); |
| setupContext(); |
| } |
| |
| /** |
| * This test validates the correctness in case of the request has 1 node and |
| * the node belongs to an active subcluster. |
| */ |
| @Test |
| public void testNodeInActiveSubCluster() throws YarnException { |
| List<ResourceRequest> requests = new ArrayList<ResourceRequest>(); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1), |
| 1)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), |
| 1)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, |
| Resource.newInstance(10, 1), 1)); |
| ApplicationSubmissionContext asc = ApplicationSubmissionContext |
| .newInstance(null, null, null, null, null, false, false, 0, |
| Resources.none(), null, false, null, null); |
| asc.setAMContainerResourceRequests(requests); |
| |
| SubClusterId chosen = |
| ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); |
| // If node1 is active, we should choose the sub cluster with node1 |
| if (getActiveSubclusters().containsKey( |
| getFederationPolicyContext().getFederationSubclusterResolver() |
| .getSubClusterForNode("node1").getId())) { |
| Assert.assertEquals( |
| getFederationPolicyContext().getFederationSubclusterResolver() |
| .getSubClusterForNode("node1"), chosen); |
| } |
| // Regardless, we should choose an active SubCluster |
| Assert.assertTrue(getActiveSubclusters().containsKey(chosen)); |
| } |
| |
| /** |
| * This test validates the correctness in case of the request has multiple |
| * ResourceRequests. The tests without ResourceRequests are done in |
| * TestWeightedRandomRouterPolicy. |
| */ |
| @Test |
| public void testMultipleResourceRequests() throws YarnException { |
| List<ResourceRequest> requests = new ArrayList<ResourceRequest>(); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1), |
| 1)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "node2", Resource.newInstance(10, 1), |
| 1)); |
| ApplicationSubmissionContext asc = ApplicationSubmissionContext |
| .newInstance(null, null, null, null, null, false, false, 0, |
| Resources.none(), null, false, null, null); |
| asc.setAMContainerResourceRequests(requests); |
| try { |
| ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); |
| Assert.fail(); |
| } catch (FederationPolicyException e) { |
| Assert.assertTrue( |
| e.getMessage().startsWith("Invalid number of resource requests: ")); |
| } |
| } |
| |
| /** |
| * This test validates the correctness in case of the request has 1 node and |
| * the node does not exist in the Resolver MachineList file. |
| */ |
| @Test |
| public void testNodeNotExists() throws YarnException { |
| List<ResourceRequest> requests = new ArrayList<ResourceRequest>(); |
| boolean relaxLocality = true; |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "node5", Resource.newInstance(10, 1), |
| 1, relaxLocality)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), |
| 1)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, |
| Resource.newInstance(10, 1), 1)); |
| ApplicationSubmissionContext asc = ApplicationSubmissionContext |
| .newInstance(null, null, null, null, null, false, false, 0, |
| Resources.none(), null, false, null, null); |
| asc.setAMContainerResourceRequests(requests); |
| |
| try { |
| ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); |
| } catch (FederationPolicyException e) { |
| Assert.fail(); |
| } |
| } |
| |
| /** |
| * This test validates the correctness in case of the request has 1 node and |
| * the node is in a blacklist subclusters. |
| */ |
| @Test |
| public void testNodeInABlacklistSubCluster() throws YarnException { |
| // Blacklist SubCluster3 |
| String subClusterToBlacklist = "subcluster3"; |
| // Remember the current value of subcluster3 |
| Float value = |
| getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist); |
| getPolicyInfo().getRouterPolicyWeights() |
| .put(new SubClusterIdInfo(subClusterToBlacklist), 0.0f); |
| initializePolicy(new YarnConfiguration()); |
| |
| FederationPoliciesTestUtil |
| .initializePolicyContext(getFederationPolicyContext(), getPolicy(), |
| getPolicyInfo(), getActiveSubclusters(), new Configuration()); |
| |
| List<ResourceRequest> requests = new ArrayList<ResourceRequest>(); |
| boolean relaxLocality = true; |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1), |
| 1, relaxLocality)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), |
| 1)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, |
| Resource.newInstance(10, 1), 1)); |
| ApplicationSubmissionContext asc = ApplicationSubmissionContext |
| .newInstance(null, null, null, null, null, false, false, 0, |
| Resources.none(), null, false, null, null); |
| asc.setAMContainerResourceRequests(requests); |
| |
| try { |
| SubClusterId targetId = |
| ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); |
| // The selected subcluster HAS no to be the same as the one blacklisted. |
| Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist); |
| } catch (FederationPolicyException e) { |
| Assert.fail(); |
| } |
| |
| // Set again the previous value for the other tests |
| getPolicyInfo().getRouterPolicyWeights() |
| .put(new SubClusterIdInfo(subClusterToBlacklist), value); |
| } |
| |
| /** |
| * This test validates the correctness in case of the request has 1 node and |
| * the node is not in the policy weights. |
| */ |
| @Test |
| public void testNodeNotInPolicy() throws YarnException { |
| // Blacklist SubCluster3 |
| String subClusterToBlacklist = "subcluster3"; |
| // Remember the current value of subcluster3 |
| Float value = |
| getPolicyInfo().getRouterPolicyWeights().get(subClusterToBlacklist); |
| getPolicyInfo().getRouterPolicyWeights().remove(subClusterToBlacklist); |
| initializePolicy(new YarnConfiguration()); |
| |
| FederationPoliciesTestUtil |
| .initializePolicyContext(getFederationPolicyContext(), getPolicy(), |
| getPolicyInfo(), getActiveSubclusters(), new Configuration()); |
| |
| List<ResourceRequest> requests = new ArrayList<ResourceRequest>(); |
| boolean relaxLocality = true; |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "node4", Resource.newInstance(10, 1), |
| 1, relaxLocality)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, "rack1", Resource.newInstance(10, 1), |
| 1)); |
| requests.add(ResourceRequest |
| .newInstance(Priority.UNDEFINED, ResourceRequest.ANY, |
| Resource.newInstance(10, 1), 1)); |
| ApplicationSubmissionContext asc = ApplicationSubmissionContext |
| .newInstance(null, null, null, null, null, false, false, 0, |
| Resources.none(), null, false, null, null); |
| asc.setAMContainerResourceRequests(requests); |
| |
| try { |
| SubClusterId targetId = |
| ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(asc, null); |
| // The selected subcluster HAS no to be the same as the one blacklisted. |
| Assert.assertNotEquals(targetId.getId(), subClusterToBlacklist); |
| } catch (FederationPolicyException e) { |
| Assert.fail(); |
| } |
| |
| // Set again the previous value for the other tests |
| getPolicyInfo().getRouterPolicyWeights() |
| .put(new SubClusterIdInfo(subClusterToBlacklist), value); |
| } |
| } |
| |