blob: 948d1b943b5f421a03c5d0397284dfb24d705110 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.resourcemgr.config;
import com.typesafe.config.Config;
import org.apache.drill.exec.resourcemgr.NodeResources;
import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
import org.apache.drill.exec.resourcemgr.util.MemoryConfigParser;
import java.util.UUID;
import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_ADMISSIBLE_QUERY_COUNT;
import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_WAITING_QUERY_COUNT;
/**
* Parses and initialize QueueConfiguration for a {@link ResourcePool}. It also generates a unique UUID for each queue
* which will be later used by Drillbit to store in Zookeeper along with
* {@link org.apache.drill.exec.proto.beans.DrillbitEndpoint}. This UUID is used in the leader election mechanism in
* which Drillbit participates for each of the configured rm queue.
*/
public class QueryQueueConfigImpl implements QueryQueueConfig {
// Optional queue configurations
private static final String MAX_ADMISSIBLE_KEY = "max_admissible";
private static final String MAX_WAITING_KEY = "max_waiting";
private static final String MAX_WAIT_TIMEOUT_KEY = "max_wait_timeout";
private static final String WAIT_FOR_PREFERRED_NODES_KEY = "wait_for_preferred_nodes";
private static final String MAX_QUERY_MEMORY_PER_NODE_FORMAT = "([0-9]+)\\s*([kKmMgG]?)\\s*$";
// Required queue configurations in MAX_QUERY_MEMORY_PER_NODE_FORMAT pattern
private static final String MAX_QUERY_MEMORY_PER_NODE_KEY = "max_query_memory_per_node";
private final String queueUUID;
private final String queueName;
private int maxAdmissibleQuery;
private int maxWaitingQuery;
private int maxWaitingTimeout;
private boolean waitForPreferredNodes;
private final NodeResources queueResourceShare;
private NodeResources queryPerNodeResourceShare;
public QueryQueueConfigImpl(Config queueConfig, String poolName,
NodeResources queueNodeResource) throws RMConfigException {
this.queueUUID = UUID.randomUUID().toString();
this.queueName = poolName;
this.queueResourceShare = queueNodeResource;
parseQueueConfig(queueConfig);
}
/**
* Assigns either supplied or default values for the optional queue configuration. For required configuration uses
* the supplied value in queueConfig object or throws proper exception if not present
* @param queueConfig Config object for ResourcePool queue
* @throws RMConfigException in case of error while parsing config
*/
private void parseQueueConfig(Config queueConfig) throws RMConfigException {
this.maxAdmissibleQuery = queueConfig.hasPath(MAX_ADMISSIBLE_KEY) ?
queueConfig.getInt(MAX_ADMISSIBLE_KEY) : MAX_ADMISSIBLE_QUERY_COUNT;
this.maxWaitingQuery = queueConfig.hasPath(MAX_WAITING_KEY) ?
queueConfig.getInt(MAX_WAITING_KEY) : MAX_WAITING_QUERY_COUNT;
this.maxWaitingTimeout = queueConfig.hasPath(MAX_WAIT_TIMEOUT_KEY) ?
queueConfig.getInt(MAX_WAIT_TIMEOUT_KEY) : RMCommonDefaults.MAX_WAIT_TIMEOUT_IN_MS;
this.waitForPreferredNodes = queueConfig.hasPath(WAIT_FOR_PREFERRED_NODES_KEY) ?
queueConfig.getBoolean(WAIT_FOR_PREFERRED_NODES_KEY) : RMCommonDefaults.WAIT_FOR_PREFERRED_NODES;
this.queryPerNodeResourceShare = parseAndGetNodeShare(queueConfig);
}
@Override
public String getQueueId() {
return queueUUID;
}
@Override
public String getQueueName() {
return queueName;
}
/**
* Total memory share of this queue in the cluster based on per node resource share. It assumes that the cluster is
* made up of homogeneous nodes in terms of resources
* @param numClusterNodes total number of available cluster nodes which can participate in this queue
* @return queue memory share in MB
*/
@Override
public long getQueueTotalMemoryInMB(int numClusterNodes) {
return queueResourceShare.getMemoryInMB() * numClusterNodes;
}
@Override
public long getMaxQueryMemoryInMBPerNode() {
return queryPerNodeResourceShare.getMemoryInMB();
}
@Override
public long getMaxQueryTotalMemoryInMB(int numClusterNodes) {
return queryPerNodeResourceShare.getMemoryInMB() * numClusterNodes;
}
@Override
public boolean waitForPreferredNodes() {
return waitForPreferredNodes;
}
@Override
public int getMaxAdmissibleQueries() {
return maxAdmissibleQuery;
}
@Override
public int getMaxWaitingQueries() {
return maxWaitingQuery;
}
@Override
public int getWaitTimeoutInMs() {
return maxWaitingTimeout;
}
/**
* Parses queues {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} configuration using
* {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_FORMAT} pattern to get memory value in bytes using
* {@link MemoryConfigParser#parseMemoryConfigString(String, String)} utility.
* @param queueConfig Queue Configuration object
* @return NodeResources created with value of {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} in bytes
* @throws RMConfigException in case the config value is absent or doesn't follow the specified format
*/
private NodeResources parseAndGetNodeShare(Config queueConfig) throws RMConfigException {
try {
long memoryPerNodeInBytes = MemoryConfigParser.parseMemoryConfigString(
queueConfig.getString(MAX_QUERY_MEMORY_PER_NODE_KEY), MAX_QUERY_MEMORY_PER_NODE_FORMAT);
return new NodeResources(memoryPerNodeInBytes, Integer.MAX_VALUE);
} catch (Exception ex) {
throw new RMConfigException(String.format("Failed while parsing %s for queue %s", MAX_QUERY_MEMORY_PER_NODE_KEY,
queueName), ex);
}
}
@Override
public String toString() {
return "{ QueueName: " + queueName + ", QueueId: " + queueUUID + ", QueuePerNodeResource(MB): " +
queryPerNodeResourceShare.toString() + ", MaxQueryMemPerNode(MB): " + queryPerNodeResourceShare.toString() +
", MaxAdmissible: " + maxAdmissibleQuery + ", MaxWaiting: " + maxWaitingQuery + ", MaxWaitTimeout: " +
maxWaitingTimeout + ", WaitForPreferredNodes: " + waitForPreferredNodes + "}";
}
}