blob: 68eceb5a325a683106f7542776154940039b32dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
/**
* This class provides a facade to the policy subsystem, and handles the
* lifecycle of policies (e.g., refresh from remote, default behaviors etc.).
*/
public class RouterPolicyFacade {
private static final Logger LOG =
LoggerFactory.getLogger(RouterPolicyFacade.class);
private final SubClusterResolver subClusterResolver;
private final FederationStateStoreFacade federationFacade;
private Map<String, SubClusterPolicyConfiguration> globalConfMap;
@VisibleForTesting
Map<String, FederationRouterPolicy> globalPolicyMap;
public RouterPolicyFacade(Configuration conf,
FederationStateStoreFacade facade, SubClusterResolver resolver,
SubClusterId homeSubcluster)
throws FederationPolicyInitializationException {
this.federationFacade = facade;
this.subClusterResolver = resolver;
this.globalConfMap = new ConcurrentHashMap<>();
this.globalPolicyMap = new ConcurrentHashMap<>();
// load default behavior from store if possible
String defaultKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
SubClusterPolicyConfiguration configuration = null;
try {
configuration = federationFacade.getPolicyConfiguration(defaultKey);
} catch (YarnException e) {
LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ "configuration fallback behavior.");
}
// or from XML conf otherwise.
if (configuration == null) {
String defaultFederationPolicyManager =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
String defaultPolicyParamString =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
ByteBuffer defaultPolicyParam = ByteBuffer
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
configuration = SubClusterPolicyConfiguration.newInstance(defaultKey,
defaultFederationPolicyManager, defaultPolicyParam);
}
// construct the required policy manager
FederationPolicyInitializationContext fallbackContext =
new FederationPolicyInitializationContext(configuration,
subClusterResolver, federationFacade, homeSubcluster);
FederationPolicyManager fallbackPolicyManager =
FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
fallbackPolicyManager.setQueue(defaultKey);
// add to the cache the fallback behavior
globalConfMap.put(defaultKey,
fallbackContext.getSubClusterPolicyConfiguration());
globalPolicyMap.put(defaultKey,
fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
}
/**
* This method provides a wrapper of all policy functionalities for routing .
* Internally it manages configuration changes, and policy init/reinit.
*
* @param appSubmissionContext the {@link ApplicationSubmissionContext} that
* has to be routed to an appropriate subCluster for execution.
*
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
*
* @return the {@link SubClusterId} that will be the "home" for this
* application.
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this app.
*/
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
// method.
Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;
if (appSubmissionContext == null) {
throw new FederationPolicyException(
"The ApplicationSubmissionContext cannot be null.");
}
String queue = appSubmissionContext.getQueue();
// respecting YARN behavior we assume default queue if the queue is not
// specified. This also ensures that "null" can be used as a key to get the
// default behavior.
if (queue == null) {
queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
}
FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);
if (policy == null) {
// this should never happen, as the to maps are updated together
throw new FederationPolicyException("No FederationRouterPolicy found "
+ "for queue: " + appSubmissionContext.getQueue() + " (for "
+ "application: " + appSubmissionContext.getApplicationId() + ") "
+ "and no default specified.");
}
return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
}
/**
* This method reinitializes a policy and loads it in the policyMap.
*
* @param queue the queue to initialize a policy for.
* @param conf the configuration to use for initialization.
*
* @throws FederationPolicyInitializationException if initialization fails.
*/
private void singlePolicyReinit(Map<String, FederationRouterPolicy> policyMap,
Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue,
SubClusterPolicyConfiguration conf)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(conf, subClusterResolver,
federationFacade, null);
String newType = context.getSubClusterPolicyConfiguration().getType();
FederationRouterPolicy routerPolicy = policyMap.get(queue);
FederationPolicyManager federationPolicyManager =
FederationPolicyUtils.instantiatePolicyManager(newType);
// set queue, reinit policy if required (implementation lazily check
// content of conf), and cache it
federationPolicyManager.setQueue(queue);
routerPolicy =
federationPolicyManager.getRouterPolicy(context, routerPolicy);
// we need the two put to be atomic (across multiple threads invoking
// this and reset operations)
synchronized (this) {
policyMap.put(queue, routerPolicy);
cachedConfs.put(queue, conf);
}
}
/**
* This method flushes all cached configurations and policies. This should be
* invoked if the facade remains activity after very large churn of queues in
* the system.
*/
public synchronized void reset() {
// remember the fallBack
SubClusterPolicyConfiguration conf =
globalConfMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
FederationRouterPolicy policy =
globalPolicyMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
globalConfMap = new ConcurrentHashMap<>();
globalPolicyMap = new ConcurrentHashMap<>();
// add to the cache a fallback with keyword null
globalConfMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, conf);
globalPolicyMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY,
policy);
}
/**
* This method provides a wrapper of all policy functionalities for routing a
* reservation. Internally it manages configuration changes, and policy
* init/reinit.
*
* @param request the reservation to route.
*
* @return the id of the subcluster that will be the "home" for this
* reservation.
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this reservation.
*/
public SubClusterId getReservationHomeSubCluster(
ReservationSubmissionRequest request) throws YarnException {
// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
// method.
Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;
if (request == null) {
throw new FederationPolicyException(
"The ReservationSubmissionRequest cannot be null.");
}
String queue = request.getQueue();
FederationRouterPolicy policy = getFederationRouterPolicy(cachedConfs, policyMap, queue);
if (policy == null) {
// this should never happen, as the to maps are updated together
throw new FederationPolicyException("No FederationRouterPolicy found "
+ "for queue: " + request.getQueue() + " (while routing "
+ "reservation: " + request.getReservationId() + ") "
+ "and no default specified.");
}
return policy.getReservationHomeSubcluster(request);
}
private FederationRouterPolicy getFederationRouterPolicy(
Map<String, SubClusterPolicyConfiguration> cachedConfiguration,
Map<String, FederationRouterPolicy> policyMap, String queue)
throws FederationPolicyInitializationException {
// the facade might cache this request, based on its parameterization
SubClusterPolicyConfiguration configuration = null;
String copyQueue = queue;
try {
configuration = federationFacade.getPolicyConfiguration(copyQueue);
} catch (YarnException e) {
LOG.warn("There is no policy configured for the queue: {}, falling back to defaults.",
copyQueue, e);
}
// If there is no policy configured for this queue, fallback to the baseline
// policy that is configured either in the store or via XML config (and cached)
if (configuration == null) {
final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
LOG.warn("There is no policies configured for queue: {} " +
"we fallback to default policy for: {}. ", copyQueue, policyKey);
copyQueue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
try {
configuration = federationFacade.getPolicyConfiguration(copyQueue);
} catch (YarnException e) {
LOG.warn("Cannot retrieve policy configured for the queue: {}, falling back to defaults.",
copyQueue, e);
}
}
// the fallback is not configure via store, but via XML, using
// previously loaded configuration.
if (configuration == null) {
configuration = cachedConfiguration.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
}
// if the configuration has changed since last loaded, reinit the policy
// based on current configuration
SubClusterPolicyConfiguration policyConfiguration =
cachedConfiguration.getOrDefault(copyQueue, null);
if (policyConfiguration == null || !policyConfiguration.equals(configuration)) {
singlePolicyReinit(policyMap, cachedConfiguration, copyQueue, configuration);
}
return policyMap.get(copyQueue);
}
}