blob: d0e2decb2de2fbebdbe5c1a01217c01751c9128d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test of {@link RouterPolicyFacade}.
*/
public class TestRouterPolicyFacade {
private RouterPolicyFacade routerFacade;
private List<SubClusterId> subClusterIds;
private FederationStateStore store;
private String queue1 = "queue1";
private String defQueueKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
@Before
public void setup() throws YarnException {
// setting up a store and its facade (with caching off)
FederationStateStoreFacade fedFacade =
FederationStateStoreFacade.getInstance();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0");
store = new MemoryFederationStateStore();
store.init(conf);
fedFacade.reinitialize(store, conf);
FederationStateStoreTestUtil storeTestUtil =
new FederationStateStoreTestUtil(store);
storeTestUtil.registerSubClusters(10);
subClusterIds = storeTestUtil.getAllSubClusterIds(true);
store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
.newInstance(getUniformPolicy(queue1)));
SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
routerFacade = new RouterPolicyFacade(new YarnConfiguration(), fedFacade,
resolver, subClusterIds.get(0));
}
@Test
public void testConfigurationUpdate() throws YarnException {
// in this test we see what happens when the configuration is changed
// between calls. We achieve this by changing what is in the store.
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
// first call runs using standard UniformRandomRouterPolicy
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof UniformRandomRouterPolicy);
// then the operator changes how queue1 is routed setting it to
// PriorityRouterPolicy with weights favoring the first subcluster in
// subClusterIds.
store.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
.newInstance(getPriorityPolicy(queue1)));
// second call is routed by new policy PriorityRouterPolicy
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(chosen.equals(subClusterIds.get(0)));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof PriorityRouterPolicy);
}
@Test
public void testGetHomeSubcluster() throws YarnException {
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(applicationSubmissionContext.getQueue()).thenReturn(queue1);
// the facade only contains the fallback behavior
Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
&& routerFacade.globalPolicyMap.size() == 1);
// when invoked it returns the expected SubClusterId.
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
// now the caching of policies must have added an entry for this queue
Assert.assertTrue(routerFacade.globalPolicyMap.size() == 2);
// after the facade is used the policyMap contains the expected policy type.
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof UniformRandomRouterPolicy);
// the facade is again empty after reset
routerFacade.reset();
// the facade only contains the fallback behavior
Assert.assertTrue(routerFacade.globalPolicyMap.containsKey(defQueueKey)
&& routerFacade.globalPolicyMap.size() == 1);
}
@Test
public void testFallbacks() throws YarnException {
// this tests the behavior of the system when the queue requested is
// not configured (or null) and there is no default policy configured
// for DEFAULT_FEDERATION_POLICY_KEY (*). This is our second line of
// defense.
ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
// The facade answers also for non-initialized policies (using the
// defaultPolicy)
String uninitQueue = "non-initialized-queue";
when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue);
SubClusterId chosen =
routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// empty string
when(applicationSubmissionContext.getQueue()).thenReturn("");
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// null queue also falls back to default
when(applicationSubmissionContext.getQueue()).thenReturn(null);
chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
}
public static SubClusterPolicyConfiguration getUniformPolicy(String queue)
throws FederationPolicyInitializationException {
// we go through standard lifecycle instantiating a policyManager and
// configuring it and serializing it to a conf.
UniformBroadcastPolicyManager wfp = new UniformBroadcastPolicyManager();
wfp.setQueue(queue);
SubClusterPolicyConfiguration fpc = wfp.serializeConf();
return fpc;
}
public SubClusterPolicyConfiguration getPriorityPolicy(String queue)
throws FederationPolicyInitializationException {
// we go through standard lifecycle instantiating a policyManager and
// configuring it and serializing it to a conf.
PriorityBroadcastPolicyManager wfp = new PriorityBroadcastPolicyManager();
// equal weight to all subcluster
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
for (SubClusterId s : subClusterIds) {
routerWeights.put(new SubClusterIdInfo(s), 0.9f / subClusterIds.size());
}
// beside the first one who gets more weight
SubClusterIdInfo favorite = new SubClusterIdInfo((subClusterIds.get(0)));
routerWeights.put(favorite, (0.1f + 0.9f / subClusterIds.size()));
WeightedPolicyInfo policyInfo = new WeightedPolicyInfo();
policyInfo.setRouterPolicyWeights(routerWeights);
wfp.setWeightedPolicyInfo(policyInfo);
wfp.setQueue(queue);
// serializeConf it in a context
SubClusterPolicyConfiguration fpc = wfp.serializeConf();
return fpc;
}
}