blob: 4d7b0d3d42731952e72b05041d58e7800b1c7dbb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class for Federation policy.
*/
@Private
public final class FederationPolicyUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FederationPolicyUtils.class);
public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE =
"No active SubCluster available to submit the request.";
private static final Random RAND = new Random(System.currentTimeMillis());
/** Disable constructor. */
private FederationPolicyUtils() {
}
/**
* A utilize method to instantiate a policy manager class given the type
* (class name) from {@link SubClusterPolicyConfiguration}.
*
* @param newType class name of the policy manager to create
* @return Policy manager
* @throws FederationPolicyInitializationException if fails
*/
public static FederationPolicyManager instantiatePolicyManager(String newType)
throws FederationPolicyInitializationException {
FederationPolicyManager federationPolicyManager = null;
try {
// create policy instance and set queue
Class<?> c = Class.forName(newType);
federationPolicyManager = (FederationPolicyManager) c.newInstance();
} catch (ClassNotFoundException e) {
throw new FederationPolicyInitializationException(e);
} catch (InstantiationException e) {
throw new FederationPolicyInitializationException(e);
} catch (IllegalAccessException e) {
throw new FederationPolicyInitializationException(e);
}
return federationPolicyManager;
}
/**
* Get Federation policy configuration from state store, using default queue
* and configuration as fallback.
*
* @param queue the queue of the application
* @param conf the Yarn configuration
* @param federationFacade state store facade
* @return SubClusterPolicyConfiguration recreated
*/
public static SubClusterPolicyConfiguration loadPolicyConfiguration(
String queue, Configuration conf,
FederationStateStoreFacade federationFacade) {
// The facade might cache this request, based on its parameterization
SubClusterPolicyConfiguration configuration = null;
if (queue != null) {
try {
configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) {
LOG.warn("Failed to get policy from FederationFacade with queue "
+ queue + ": " + e.getMessage());
}
}
// If there is no policy configured for this queue, fallback to the baseline
// policy that is configured either in the store or via XML config
if (configuration == null) {
LOG.info("No policy configured for queue {} in StateStore,"
+ " fallback to default queue", queue);
queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
try {
configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) {
LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ "configuration fallback behavior.");
}
}
// or from XML conf otherwise.
if (configuration == null) {
LOG.info("No policy configured for default queue {} in StateStore,"
+ " fallback to local config", queue);
String defaultFederationPolicyManager =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
String defaultPolicyParamString =
conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
ByteBuffer defaultPolicyParam = ByteBuffer
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
configuration = SubClusterPolicyConfiguration.newInstance(queue,
defaultFederationPolicyManager, defaultPolicyParam);
}
return configuration;
}
/**
* Get AMRMProxy policy from state store, using default queue and
* configuration as fallback.
*
* @param queue the queue of the application
* @param oldPolicy the previous policy instance (can be null)
* @param conf the Yarn configuration
* @param federationFacade state store facade
* @param homeSubClusterId home sub-cluster id
* @return FederationAMRMProxyPolicy recreated
* @throws FederationPolicyInitializationException if fails
*/
public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue,
FederationAMRMProxyPolicy oldPolicy, Configuration conf,
FederationStateStoreFacade federationFacade,
SubClusterId homeSubClusterId)
throws FederationPolicyInitializationException {
// Local policy and its configuration
SubClusterPolicyConfiguration configuration =
loadPolicyConfiguration(queue, conf, federationFacade);
// Instantiate the policyManager and get policy
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(configuration,
federationFacade.getSubClusterResolver(), federationFacade,
homeSubClusterId);
LOG.info("Creating policy manager of type: " + configuration.getType());
FederationPolicyManager federationPolicyManager =
instantiatePolicyManager(configuration.getType());
// set queue, reinit policy if required (implementation lazily check
// content of conf), and cache it
federationPolicyManager.setQueue(configuration.getQueue());
return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
}
/**
* Validate if there is any active subcluster that is not blacklisted, it will
* throw an exception if there are no usable subclusters.
*
* @param activeSubClusters the list of subClusters as identified by
* {@link SubClusterId} currently active.
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
* @throws FederationPolicyException if there are no usable subclusters.
*/
public static void validateSubClusterAvailability(
List<SubClusterId> activeSubClusters,
List<SubClusterId> blackListSubClusters)
throws FederationPolicyException {
if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
if (blackListSubClusters == null) {
return;
}
for (SubClusterId scId : activeSubClusters) {
if (!blackListSubClusters.contains(scId)) {
// There is at least one active subcluster
return;
}
}
}
throw new FederationPolicyException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}
/**
* Select a random bin according to the weight array for the bins. Only bins
* with positive weights will be considered. If no positive weight found,
* return -1.
*
* @param weights the weight array
* @return the index of the sample in the array
*/
public static int getWeightedRandom(ArrayList<Float> weights) {
int i;
float totalWeight = 0;
for (i = 0; i < weights.size(); i++) {
if (weights.get(i) > 0) {
totalWeight += weights.get(i);
}
}
if (totalWeight == 0) {
return -1;
}
float samplePoint = RAND.nextFloat() * totalWeight;
int lastIndex = 0;
for (i = 0; i < weights.size(); i++) {
if (weights.get(i) > 0) {
if (samplePoint <= weights.get(i)) {
return i;
} else {
lastIndex = i;
samplePoint -= weights.get(i);
}
}
}
// This can only happen if samplePoint is very close to totoalWeight and
// float rounding kicks in during subtractions
return lastIndex;
}
}