| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.util; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.thrift.TException; |
| import org.apache.thrift.TSerializer; |
| import org.apache.thrift.protocol.TBinaryProtocol; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.impala.authorization.User; |
| import org.apache.impala.common.ByteUnits; |
| import org.apache.impala.common.ImpalaException; |
| import org.apache.impala.common.InternalException; |
| import org.apache.impala.common.JniUtil; |
| import org.apache.impala.common.RuntimeEnv; |
| import org.apache.impala.thrift.TErrorCode; |
| import org.apache.impala.thrift.TPoolConfigParams; |
| import org.apache.impala.thrift.TPoolConfig; |
| import org.apache.impala.thrift.TResolveRequestPoolParams; |
| import org.apache.impala.thrift.TResolveRequestPoolResult; |
| import org.apache.impala.thrift.TStatus; |
| import org.apache.impala.util.FileWatchService.FileChangeListener; |
| import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; |
| import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; |
| import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * Admission control utility class that provides user to request pool mapping, ACL |
| * enforcement, and pool configuration values. Pools are configured via a fair scheduler |
| * allocation file (fair-scheduler.xml) and Llama configuration (llama-site.xml). This |
| * class wraps a number of Hadoop classes to provide the user to pool mapping, |
| * authorization, and accessing memory resource limits, all of which are specified in |
| * the fair scheduler allocation file. The other pool limits are specified in the |
| * Llama configuration, and those properties are accessed via the standard |
| * {@link Configuration} API. |
| * |
| * Both the allocation configuration and Llama configuration files are watched for |
| * changes and reloaded when necessary. The allocation file is watched/loaded using the |
| * Yarn {@link AllocationFileLoaderService} and the Llama configuration uses a subclass of |
| * the {@link FileWatchService}. There are two different mechanisms because there is |
| * different parsing/configuration code for the allocation file and the Llama |
| * configuration (which is a regular Hadoop conf file so it can use the |
| * {@link Configuration} class). start() and stop() will start/stop watching and reloading |
| * both of these files. |
| * |
| * A single instance is created by the backend and lasts the duration of the process. |
| */ |
| public class RequestPoolService { |
| final static Logger LOG = LoggerFactory.getLogger(RequestPoolService.class); |
| |
| private final static TBinaryProtocol.Factory protocolFactory_ = |
| new TBinaryProtocol.Factory(); |
| // Used to ensure start() has been called before any other methods can be used. |
| private final AtomicBoolean running_; |
| |
| // Key for the default maximum number of running queries ("placed reservations") |
| // property. The per-pool key name is this key with the pool name appended, e.g. |
| // "{key}.{pool}". |
| private final static String MAX_PLACED_RESERVATIONS_KEY = |
| "llama.am.throttling.maximum.placed.reservations"; |
| |
| // Default value for the maximum.placed.reservations property. Note that this value |
| // differs from the current Llama default of 10000. |
| private final static int MAX_PLACED_RESERVATIONS_DEFAULT = -1; |
| |
| // Key for the default maximum number of queued requests ("queued reservations") |
| // property. The per-pool key name is this key with the pool name appended, e.g. |
| // "{key}.{pool}". |
| private final static String MAX_QUEUED_RESERVATIONS_KEY = |
| "llama.am.throttling.maximum.queued.reservations"; |
| |
| // Default value for the maximum.queued.reservations property. Note that this value |
| // differs from the current Llama default of 0 which disables queuing. |
| private final static int MAX_QUEUED_RESERVATIONS_DEFAULT = 200; |
| |
| // Key for the pool queue timeout (milliseconds). |
| private final static String QUEUE_TIMEOUT_KEY = |
| "impala.admission-control.pool-queue-timeout-ms"; |
| |
| // Key for the pool default query options. Query options are specified as a |
| // comma delimited string of 'key=value' pairs, e.g. 'key1=val1,key2=val2'. |
| private final static String QUERY_OPTIONS_KEY = |
| "impala.admission-control.pool-default-query-options"; |
| |
| // Keys for the pool max and min query mem limits (in bytes) respectively. |
| private final static String MAX_QUERY_MEM_LIMIT_BYTES = |
| "impala.admission-control.max-query-mem-limit"; |
| private final static String MIN_QUERY_MEM_LIMIT_BYTES = |
| "impala.admission-control.min-query-mem-limit"; |
| |
| // Key for specifying if the mem_limit query option can override max/min mem limits |
| // of the pool. |
| private final static String CLAMP_MEM_LIMIT_QUERY_OPTION = |
| "impala.admission-control.clamp-mem-limit-query-option"; |
| |
| // Key for specifying the "Max Running Queries Multiple" configuration |
| // of the pool. |
| private final static String MAX_RUNNING_QUERIES_MULTIPLE = |
| "impala.admission-control.max-running-queries-multiple"; |
| |
| // Key for specifying the "Max Queued Queries Multiple" configuration |
| // of the pool. |
| private final static String MAX_QUEUED_QUERIES_MULTIPLE = |
| "impala.admission-control.max-queued-queries-multiple"; |
| |
| // Key for specifying the "Max Memory Multiple" configuration |
| // of the pool. |
| private final static String MAX_MEMORY_MULTIPLE = |
| "impala.admission-control.max-memory-multiple"; |
| |
| // String format for a per-pool configuration key. First parameter is the key for the |
| // default, e.g. MAX_PLACED_RESERVATIONS_KEY, and the second parameter is the |
| // pool name. |
| private final static String PER_POOL_CONFIG_KEY_FORMAT = "%s.%s"; |
| |
| // Watches for changes to the fair scheduler allocation file. |
| @VisibleForTesting |
| final AllocationFileLoaderService allocLoader_; |
| |
| // Provides access to the fair scheduler allocation file. An AtomicReference becaus it |
| // is reset when the allocation configuration file changes and other threads access it. |
| private final AtomicReference<AllocationConfiguration> allocationConf_; |
| |
| // Watches the configuration file for changes. |
| @VisibleForTesting |
| final FileWatchService confWatcher_; |
| |
| // Used by this class to access to the configs provided by the configuration. |
| // This is replaced when the configuration file changes. |
| private volatile Configuration conf_; |
| |
| // URL of the configuration file. |
| private final URL confUrl_; |
| |
| /** |
| * Updates the configuration when the file changes. The file is confUrl_ |
| * and it will exist when this is created (or RequestPoolService will not start). If |
| * the file is later removed, warnings will be written to the log but the previous |
| * configuration will still be accessible. |
| */ |
| private final class ConfWatcher implements FileChangeListener { |
| public void onFileChange() { |
| // If confUrl_ is null the watcher should not have been created. |
| Preconditions.checkNotNull(confUrl_); |
| LOG.info("Loading configuration: " + confUrl_.getFile()); |
| Configuration conf = new Configuration(); |
| conf.addResource(confUrl_); |
| conf_ = conf; |
| } |
| } |
| |
| /** |
| * Creates a RequestPoolService instance with a configuration containing the specified |
| * fair-scheduler.xml and llama-site.xml. |
| * |
| * @param fsAllocationPath path to the fair scheduler allocation file. |
| * @param sitePath path to the configuration file. |
| */ |
| RequestPoolService(final String fsAllocationPath, final String sitePath) { |
| Preconditions.checkNotNull(fsAllocationPath); |
| running_ = new AtomicBoolean(false); |
| allocationConf_ = new AtomicReference<>(); |
| URL fsAllocationURL = getURL(fsAllocationPath); |
| if (fsAllocationURL == null) { |
| throw new IllegalArgumentException( |
| "Unable to find allocation configuration file: " + fsAllocationPath); |
| } |
| // Load the default Hadoop configuration files for picking up overrides like custom |
| // group mapping plugins etc. |
| Configuration allocConf = new Configuration(); |
| allocConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocationURL.getPath()); |
| allocLoader_ = new AllocationFileLoaderService(); |
| allocLoader_.init(allocConf); |
| |
| if (!Strings.isNullOrEmpty(sitePath)) { |
| confUrl_ = getURL(sitePath); |
| if (confUrl_ == null) { |
| throw new IllegalArgumentException( |
| "Unable to find configuration file: " + sitePath); |
| } |
| conf_ = new Configuration(false); |
| conf_.addResource(confUrl_); |
| confWatcher_ = |
| new FileWatchService(new File(confUrl_.getPath()), new ConfWatcher()); |
| } else { |
| confWatcher_ = null; |
| confUrl_ = null; |
| } |
| } |
| |
| /** |
| * Returns a {@link URL} for the file if it exists, null otherwise. |
| */ |
| @VisibleForTesting |
| private static URL getURL(String path) { |
| Preconditions.checkNotNull(path); |
| File file = new File(path); |
| file = file.getAbsoluteFile(); |
| if (!file.exists()) { |
| LOG.error("Unable to find specified file: " + path); |
| return null; |
| } |
| try { |
| return file.toURI().toURL(); |
| } catch (MalformedURLException ex) { |
| LOG.error("Unable to construct URL for file: " + path, ex); |
| return null; |
| } |
| } |
| |
| /** |
| * Starts the RequestPoolService instance. It does the initial loading of the |
| * configuration and starts the automatic reloading. |
| */ |
| public void start() { |
| Preconditions.checkState(!running_.get()); |
| allocLoader_.setReloadListener(allocationConf_::set); |
| allocLoader_.start(); |
| try { |
| allocLoader_.reloadAllocations(); |
| } catch (Exception ex) { |
| try { |
| stopInternal(); |
| } catch (Exception stopEx) { |
| LOG.error("Unable to stop AllocationFileLoaderService after failed start.", |
| stopEx); |
| } |
| throw new RuntimeException(ex); |
| } |
| if (confWatcher_ != null) confWatcher_.start(); |
| running_.set(true); |
| } |
| |
| /** |
| * Stops the RequestPoolService instance. Only used by tests. |
| */ |
| public void stop() { |
| Preconditions.checkState(running_.get()); |
| stopInternal(); |
| } |
| |
| /** |
| * Stops the RequestPoolService instance without checking the running state. Only |
| * called by stop() (which is only used in tests) or by start() if a failure occurs. |
| * Should not be called more than once. |
| */ |
| private void stopInternal() { |
| running_.set(false); |
| if (confWatcher_ != null) confWatcher_.stop(); |
| allocLoader_.stop(); |
| } |
| |
| /** |
| * Resolves a user and pool to the pool specified by the allocation placement policy |
| * and checks if the user is authorized to submit requests. |
| * |
| * @param thriftResolvePoolParams Serialized {@link TResolveRequestPoolParams} |
| * @return serialized {@link TResolveRequestPoolResult} |
| */ |
| @SuppressWarnings("unused") // called from C++ |
| public byte[] resolveRequestPool(byte[] thriftResolvePoolParams) |
| throws ImpalaException { |
| TResolveRequestPoolParams resolvePoolParams = new TResolveRequestPoolParams(); |
| JniUtil.deserializeThrift(protocolFactory_, resolvePoolParams, |
| thriftResolvePoolParams); |
| TResolveRequestPoolResult result = resolveRequestPool(resolvePoolParams); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}", |
| resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(), |
| result.resolved_pool, result.has_access); |
| } |
| try { |
| return new TSerializer(protocolFactory_).serialize(result); |
| } catch (TException e) { |
| throw new InternalException(e.getMessage()); |
| } |
| } |
| |
| @VisibleForTesting |
| TResolveRequestPoolResult resolveRequestPool( |
| TResolveRequestPoolParams resolvePoolParams) throws InternalException { |
| String requestedPool = resolvePoolParams.getRequested_pool(); |
| String user = resolvePoolParams.getUser(); |
| TResolveRequestPoolResult result = new TResolveRequestPoolResult(); |
| String errorMessage = null; |
| String pool = null; |
| try { |
| pool = assignToPool(requestedPool, user); |
| } catch (IOException ex) { |
| errorMessage = ex.getMessage(); |
| if (errorMessage.startsWith("No groups found for user")) { |
| // The error thrown when using the 'primaryGroup' or 'secondaryGroup' rules and |
| // the user does not exist are not helpful. |
| errorMessage = String.format( |
| "Failed to resolve user '%s' to a pool while evaluating the " + |
| "'primaryGroup' or 'secondaryGroup' queue placement rules because no " + |
| "groups were found for the user. This is likely because the user does not " + |
| "exist on the local operating system.", resolvePoolParams.getUser()); |
| } |
| LOG.warn(String.format("Error assigning to pool. requested='%s', user='%s', msg=%s", |
| requestedPool, user, errorMessage), ex); |
| } |
| if (pool == null) { |
| if (errorMessage == null) { |
| // This occurs when assignToPool returns null (not an error), i.e. if the pool |
| // cannot be resolved according to the policy. |
| result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList())); |
| } else { |
| // If Yarn throws an exception, return an error status. |
| result.setStatus( |
| new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(errorMessage))); |
| } |
| } else { |
| result.setResolved_pool(pool); |
| result.setHas_access(hasAccess(pool, user)); |
| result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList())); |
| } |
| return result; |
| } |
| |
| /** |
| * Gets the pool configuration values for the specified pool. |
| * |
| * @param thriftPoolConfigParams Serialized {@link TPoolConfigParams} |
| * @return serialized {@link TPoolConfig} |
| */ |
| @SuppressWarnings("unused") // called from C++ |
| public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException { |
| Preconditions.checkState(running_.get()); |
| TPoolConfigParams poolConfigParams = new TPoolConfigParams(); |
| JniUtil.deserializeThrift(protocolFactory_, poolConfigParams, |
| thriftPoolConfigParams); |
| TPoolConfig result = getPoolConfig(poolConfigParams.getPool()); |
| try { |
| return new TSerializer(protocolFactory_).serialize(result); |
| } catch (TException e) { |
| throw new InternalException(e.getMessage()); |
| } |
| } |
| |
| @VisibleForTesting |
| TPoolConfig getPoolConfig(String pool) { |
| TPoolConfig result = new TPoolConfig(); |
| long maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory(); |
| result.setMax_mem_resources( |
| maxMemoryMb == Integer.MAX_VALUE ? -1 : maxMemoryMb * ByteUnits.MEGABYTE); |
| if (conf_ == null) { |
| result.setMax_requests(MAX_PLACED_RESERVATIONS_DEFAULT); |
| result.setMax_queued(MAX_QUEUED_RESERVATIONS_DEFAULT); |
| result.setDefault_query_options(""); |
| } else { |
| // Capture the current conf_ in case it changes while we're using it. |
| Configuration currentConf = conf_; |
| result.setMax_requests(getPoolConfigValue(currentConf, pool, |
| MAX_PLACED_RESERVATIONS_KEY, MAX_PLACED_RESERVATIONS_DEFAULT)); |
| result.setMax_queued(getPoolConfigValue(currentConf, pool, |
| MAX_QUEUED_RESERVATIONS_KEY, MAX_QUEUED_RESERVATIONS_DEFAULT)); |
| |
| // Only return positive values. Admission control has a default from gflags. |
| long queueTimeoutMs = getPoolConfigValue(currentConf, pool, QUEUE_TIMEOUT_KEY, -1L); |
| if (queueTimeoutMs > 0) result.setQueue_timeout_ms(queueTimeoutMs); |
| result.setDefault_query_options( |
| getPoolConfigValue(currentConf, pool, QUERY_OPTIONS_KEY, "")); |
| result.setMax_query_mem_limit( |
| getPoolConfigValue(currentConf, pool, MAX_QUERY_MEM_LIMIT_BYTES, 0L)); |
| result.setMin_query_mem_limit( |
| getPoolConfigValue(currentConf, pool, MIN_QUERY_MEM_LIMIT_BYTES, 0L)); |
| result.setClamp_mem_limit_query_option( |
| getPoolConfigValue(currentConf, pool, CLAMP_MEM_LIMIT_QUERY_OPTION, true)); |
| result.setMax_running_queries_multiple( |
| getPoolConfigDoubleValue(currentConf, pool, MAX_RUNNING_QUERIES_MULTIPLE, 0.0)); |
| result.setMax_queued_queries_multiple( |
| getPoolConfigDoubleValue(currentConf, pool, MAX_QUEUED_QUERIES_MULTIPLE, 0.0)); |
| result.setMax_memory_multiple( |
| getPoolConfigValue(currentConf, pool, MAX_MEMORY_MULTIPLE, 0)); |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.debug("getPoolConfig(pool={}): max_mem_resources={}, max_requests={}," |
| + " max_queued={}, queue_timeout_ms={}, default_query_options={}," |
| + " max_query_mem_limit={}, min_query_mem_limit={}," |
| + " clamp_mem_limit_query_option={}, max_running_queries_multiple={}," |
| + " max_queued_queries_multiple={}, max_memory_multiple={}", |
| pool, result.max_mem_resources, result.max_requests, result.max_queued, |
| result.queue_timeout_ms, result.default_query_options, |
| result.max_query_mem_limit, result.min_query_mem_limit, |
| result.clamp_mem_limit_query_option, result.max_running_queries_multiple, |
| result.max_queued_queries_multiple, result.max_memory_multiple); |
| } |
| return result; |
| } |
| |
| /** |
| * Looks up the per-pool integer config from the Configuration. First checks for |
| * a per-pool value, then a default set in the config, and lastly to the specified |
| * 'defaultValue'. |
| * |
| * @param conf The Configuration to use, provided so the caller can ensure the same |
| * Configuration is used to look up multiple properties. |
| */ |
| private long getPoolConfigValue( |
| Configuration conf, String pool, String key, long defaultValue) { |
| return conf.getLong(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool), |
| conf.getLong(key, defaultValue)); |
| } |
| |
| /** |
| * Looks up the per-pool String config from the Configuration. See above. |
| */ |
| private String getPoolConfigValue( |
| Configuration conf, String pool, String key, String defaultValue) { |
| return conf.get(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool), |
| conf.get(key, defaultValue)); |
| } |
| |
| /** |
| * Looks up the per-pool Boolean config from the Configuration. See above. |
| */ |
| private boolean getPoolConfigValue( |
| Configuration conf, String pool, String key, boolean defaultValue) { |
| return conf.getBoolean(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool), |
| conf.getBoolean(key, defaultValue)); |
| } |
| |
| /** |
| * Looks up the per-pool Double config from the Configuration. See above. |
| */ |
| private double getPoolConfigDoubleValue( |
| Configuration conf, String pool, String key, double defaultValue) { |
| return conf.getDouble(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool), |
| conf.getDouble(key, defaultValue)); |
| } |
| |
| /** |
| * Resolves the actual pool to use via the allocation placement policy. The policy may |
| * change the requested pool. |
| * |
| * @param requestedPool The requested pool. May not be null, an empty string indicates |
| * the policy should return the default pool for this user. |
| * @param user The user, must not be null or empty. |
| * @return the actual pool to use, null if a pool could not be resolved. |
| */ |
| @VisibleForTesting |
| String assignToPool(String requestedPool, String user) |
| throws InternalException, IOException { |
| Preconditions.checkState(running_.get()); |
| Preconditions.checkNotNull(requestedPool); |
| Preconditions.checkArgument(!Strings.isNullOrEmpty(user)); |
| // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because |
| // assignAppToQueue() will check group membership which should always be done on |
| // the short name of the principal. |
| String shortName = new User(user).getShortName(); |
| return allocationConf_.get().getPlacementPolicy().assignAppToQueue( |
| requestedPool.isEmpty() ? YarnConfiguration.DEFAULT_QUEUE_NAME : requestedPool, |
| shortName); |
| } |
| |
| /** |
| * Indicates if a user has access to the pool. |
| * |
| * @param pool the pool to check if the user has access to. NOTE: it should always be |
| * called with a pool returned by the {@link #assignToPool(String, String)} method. |
| * @param user the user to check if it has access to the pool. |
| * @return True if the user has access to the pool. |
| */ |
| @VisibleForTesting |
| boolean hasAccess(String pool, String user) throws InternalException { |
| Preconditions.checkState(running_.get()); |
| Preconditions.checkArgument(!Strings.isNullOrEmpty(pool)); |
| Preconditions.checkArgument(!Strings.isNullOrEmpty(user)); |
| // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because |
| // the UserGroupInformation will check group membership which should always be done |
| // on the short name of the principal. |
| String shortName; |
| User requestingUser = new User(user); |
| shortName = requestingUser.getShortName(); |
| UserGroupInformation ugi = UserGroupInformation.createRemoteUser(shortName); |
| return allocationConf_.get().hasAccess(pool, QueueACL.SUBMIT_APPLICATIONS, ugi); |
| } |
| |
| /** |
| * Returns the AllocationConfiguration corresponding to this instance of |
| * RequestPoolService. |
| */ |
| @VisibleForTesting |
| AllocationConfiguration getAllocationConfig() { |
| Preconditions.checkState(RuntimeEnv.INSTANCE.isTestEnv()); |
| return allocationConf_.get(); |
| } |
| } |