blob: bde33cc189fd44ab46cf3599635fb7d19be3c79d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.resourcemgr.config;
import com.typesafe.config.Config;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.resourcemgr.NodeResources;
import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy;
import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy;
import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicyFactory;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT;
import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY;
/**
* Parses and initializes configuration for ResourceManagement in Drill. It takes care of creating all the ResourcePools
* recursively maintaining the n-ary tree hierarchy defined in the configuration.
*/
public class ResourcePoolTreeImpl implements ResourcePoolTree {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolTreeImpl.class);
public static final String ROOT_POOL_QUEUE_SELECTION_POLICY_KEY = "queue_selection_policy";
public static final String ROOT_POOL_CONFIG_KEY = "drill.exec.rm";
private static final String ROOT_POOL_MEMORY_SHARE_KEY = "memory";
private final ResourcePool rootPool;
private final Config rmConfig;
private final NodeResources totalNodeResources;
private final double resourceShare;
private final Map<String, QueryQueueConfig> leafQueues = Maps.newHashMap();
private final QueueSelectionPolicy selectionPolicy;
public ResourcePoolTreeImpl(Config rmConfig, long totalNodeMemory,
int totalNodePhysicalCpu, int vFactor) throws RMConfigException {
this(rmConfig, new NodeResources(totalNodeMemory, totalNodePhysicalCpu, vFactor));
}
private ResourcePoolTreeImpl(Config rmConfig, NodeResources totalNodeResources) throws RMConfigException {
try {
this.rmConfig = rmConfig;
final Config rootConfig = this.rmConfig.getConfig(ROOT_POOL_CONFIG_KEY);
this.totalNodeResources = totalNodeResources;
this.resourceShare = rootConfig.hasPath(ROOT_POOL_MEMORY_SHARE_KEY) ?
rootConfig.getDouble(ROOT_POOL_MEMORY_SHARE_KEY) : ROOT_POOL_DEFAULT_MEMORY_PERCENT;
this.selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy(
rootConfig.hasPath(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY) ?
SelectionPolicy.valueOf(rootConfig.getString(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY).trim().toUpperCase()) :
ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY);
rootPool = new ResourcePoolImpl(rootConfig, resourceShare, 1.0,
totalNodeResources, null, leafQueues);
logger.debug("Dumping RM configuration {}", toString());
} catch (RMConfigException ex) {
throw ex;
} catch (Exception ex) {
throw new RMConfigException(String.format("Failure while parsing root pool configuration. " +
"[Details: Config: %s]", rmConfig), ex);
}
}
/**
* @return root {@link ResourcePool}
*/
@Override
public ResourcePool getRootPool() {
return rootPool;
}
/**
* @return Map containing all the configured leaf queues
*/
@Override
public Map<String, QueryQueueConfig> getAllLeafQueues() {
return leafQueues;
}
@Override
public double getResourceShare() {
return resourceShare;
}
/**
* Creates {@link QueueAssignmentResult} which contains list of all selected leaf ResourcePools and all the rejected
* ResourcePools for the provided query. Performs DFS of the ResourcePoolTree to traverse and find
* selected/rejected ResourcePools.
* @param queryContext {@link QueryContext} which contains metadata required for the given query
* @return {@link QueueAssignmentResult} populated with selected/rejected ResourcePools
*/
@Override
public QueueAssignmentResult selectAllQueues(QueryContext queryContext) {
QueueAssignmentResult assignmentResult = new QueueAssignmentResult();
rootPool.visitAndSelectPool(assignmentResult, queryContext);
return assignmentResult;
}
/**
* Selects a leaf queue out of all the possibles leaf queues returned by
* {@link ResourcePoolTree#selectAllQueues(QueryContext)} for a given query based on the configured
* {@link QueueSelectionPolicy}. If none of the queue qualifies then it throws {@link QueueSelectionException}
* @param queryContext {@link QueryContext} which contains metadata for given query
* @param queryMaxNodeResource Max resources on a node required for given query
* @return {@link QueryQueueConfig} for the selected leaf queue
* @throws QueueSelectionException If no leaf queue is selected
*/
@Override
public QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource)
throws QueueSelectionException {
final QueueAssignmentResult assignmentResult = selectAllQueues(queryContext);
final List<ResourcePool> selectedPools = assignmentResult.getSelectedLeafPools();
if (selectedPools.size() == 0) {
throw new QueueSelectionException(String.format("No resource pools to choose from for the query: %s",
queryContext.getQueryId()));
} else if (selectedPools.size() == 1) {
return selectedPools.get(0).getQueryQueue();
}
return selectionPolicy.selectQueue(selectedPools, queryContext, queryMaxNodeResource).getQueryQueue();
}
/**
* @return {@link QueueSelectionPolicy} which is used to chose one queue out of all the available options for a query
*/
@Override
public QueueSelectionPolicy getSelectionPolicyInUse() {
return selectionPolicy;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{ NodeResources: ").append(totalNodeResources.toString());
sb.append(", ResourcePercent: ").append(resourceShare);
sb.append(", SelectionPolicy: ").append(selectionPolicy.getSelectionPolicy());
sb.append(", RootPool: ").append(rootPool.toString());
sb.append("}");
return sb.toString();
}
}