// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.util;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.impala.authorization.User;
import org.apache.impala.common.ByteUnits;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.JniUtil;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TPoolConfigParams;
import org.apache.impala.thrift.TPoolConfig;
import org.apache.impala.thrift.TResolveRequestPoolParams;
import org.apache.impala.thrift.TResolveRequestPoolResult;
import org.apache.impala.thrift.TStatus;
import org.apache.impala.util.FileWatchService.FileChangeListener;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;

/**
 * Admission control utility class that provides user to request pool mapping, ACL
 * enforcement, and pool configuration values. Pools are configured via a fair scheduler
 * allocation file (fair-scheduler.xml) and Llama configuration (llama-site.xml). This
 * class wraps a number of Hadoop classes to provide the user to pool mapping,
 * authorization, and accessing memory resource limits, all of which are specified in
 * the fair scheduler allocation file. The other pool limits are specified in the
 * Llama configuration, and those properties are accessed via the standard
 * {@link Configuration} API.
 *
 * Both the allocation configuration and Llama configuration files are watched for
 * changes and reloaded when necessary. The allocation file is watched/loaded using the
 * Yarn {@link AllocationFileLoaderService} and the Llama configuration uses a subclass of
 * the {@link FileWatchService}. There are two different mechanisms because there is
 * different parsing/configuration code for the allocation file and the Llama
 * configuration (which is a regular Hadoop conf file so it can use the
 * {@link Configuration} class). start() and stop() will start/stop watching and reloading
 * both of these files.
 *
 * A single instance is created by the backend and lasts the duration of the process.
 */
public class RequestPoolService {
  final static Logger LOG = LoggerFactory.getLogger(RequestPoolService.class);

  private final static TBinaryProtocol.Factory protocolFactory_ =
      new TBinaryProtocol.Factory();
  // Used to ensure start() has been called before any other methods can be used.
  private final AtomicBoolean running_;

  // Key for the default maximum number of running queries ("placed reservations")
  // property. The per-pool key name is this key with the pool name appended, e.g.
  // "{key}.{pool}".
  private final static String MAX_PLACED_RESERVATIONS_KEY =
      "llama.am.throttling.maximum.placed.reservations";

  // Default value for the maximum.placed.reservations property. Note that this value
  // differs from the current Llama default of 10000.
  private final static int MAX_PLACED_RESERVATIONS_DEFAULT = -1;

  // Key for the default maximum number of queued requests ("queued reservations")
  // property. The per-pool key name is this key with the pool name appended, e.g.
  // "{key}.{pool}".
  private final static String MAX_QUEUED_RESERVATIONS_KEY =
      "llama.am.throttling.maximum.queued.reservations";

  // Default value for the maximum.queued.reservations property. Note that this value
  // differs from the current Llama default of 0 which disables queuing.
  private final static int MAX_QUEUED_RESERVATIONS_DEFAULT = 200;

  // Key for the pool queue timeout (milliseconds).
  private final static String QUEUE_TIMEOUT_KEY =
      "impala.admission-control.pool-queue-timeout-ms";

  // Key for the pool default query options. Query options are specified as a
  // comma delimited string of 'key=value' pairs, e.g. 'key1=val1,key2=val2'.
  private final static String QUERY_OPTIONS_KEY =
      "impala.admission-control.pool-default-query-options";

  // Keys for the pool max and min query mem limits (in bytes) respectively.
  private final static String MAX_QUERY_MEM_LIMIT_BYTES =
      "impala.admission-control.max-query-mem-limit";
  private final static String MIN_QUERY_MEM_LIMIT_BYTES =
      "impala.admission-control.min-query-mem-limit";

  // Key for specifying if the mem_limit query option can override max/min mem limits
  // of the pool.
  private final static String CLAMP_MEM_LIMIT_QUERY_OPTION =
      "impala.admission-control.clamp-mem-limit-query-option";

  // Key for specifying the "Max Running Queries Multiple" configuration
  // of the pool.
  private final static String MAX_RUNNING_QUERIES_MULTIPLE =
      "impala.admission-control.max-running-queries-multiple";

  // Key for specifying the "Max Queued Queries Multiple" configuration
  // of the pool.
  private final static String MAX_QUEUED_QUERIES_MULTIPLE =
      "impala.admission-control.max-queued-queries-multiple";

  // Key for specifying the "Max Memory Multiple" configuration
  // of the pool.
  private final static String MAX_MEMORY_MULTIPLE =
      "impala.admission-control.max-memory-multiple";

  // String format for a per-pool configuration key. First parameter is the key for the
  // default, e.g. MAX_PLACED_RESERVATIONS_KEY, and the second parameter is the
  // pool name.
  private final static String PER_POOL_CONFIG_KEY_FORMAT = "%s.%s";

  // Watches for changes to the fair scheduler allocation file.
  @VisibleForTesting
  final AllocationFileLoaderService allocLoader_;

  // Provides access to the fair scheduler allocation file. An AtomicReference becaus it
  // is reset when the allocation configuration file changes and other threads access it.
  private final AtomicReference<AllocationConfiguration> allocationConf_;

  // Watches the configuration file for changes.
  @VisibleForTesting
  final FileWatchService confWatcher_;

  // Used by this class to access to the configs provided by the configuration.
  // This is replaced when the configuration file changes.
  private volatile Configuration conf_;

  // URL of the configuration file.
  private final URL confUrl_;

  /**
   * Updates the configuration when the file changes. The file is confUrl_
   * and it will exist when this is created (or RequestPoolService will not start). If
   * the file is later removed, warnings will be written to the log but the previous
   * configuration will still be accessible.
   */
  private final class ConfWatcher implements FileChangeListener {
    public void onFileChange() {
      // If confUrl_ is null the watcher should not have been created.
      Preconditions.checkNotNull(confUrl_);
      LOG.info("Loading configuration: " + confUrl_.getFile());
      Configuration conf = new Configuration();
      conf.addResource(confUrl_);
      conf_ = conf;
    }
  }

  /**
   * Creates a RequestPoolService instance with a configuration containing the specified
   * fair-scheduler.xml and llama-site.xml.
   *
   * @param fsAllocationPath path to the fair scheduler allocation file.
   * @param sitePath path to the configuration file.
   */
  RequestPoolService(final String fsAllocationPath, final String sitePath) {
    Preconditions.checkNotNull(fsAllocationPath);
    running_ = new AtomicBoolean(false);
    allocationConf_ = new AtomicReference<>();
    URL fsAllocationURL = getURL(fsAllocationPath);
    if (fsAllocationURL == null) {
      throw new IllegalArgumentException(
          "Unable to find allocation configuration file: " + fsAllocationPath);
    }
    // Load the default Hadoop configuration files for picking up overrides like custom
    // group mapping plugins etc.
    Configuration allocConf = new Configuration();
    allocConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocationURL.getPath());
    allocLoader_ = new AllocationFileLoaderService();
    allocLoader_.init(allocConf);

    if (!Strings.isNullOrEmpty(sitePath)) {
      confUrl_ = getURL(sitePath);
      if (confUrl_ == null) {
        throw new IllegalArgumentException(
            "Unable to find configuration file: " + sitePath);
      }
      conf_ = new Configuration(false);
      conf_.addResource(confUrl_);
      confWatcher_ =
          new FileWatchService(new File(confUrl_.getPath()), new ConfWatcher());
    } else {
      confWatcher_ = null;
      confUrl_ = null;
    }
  }

  /**
   * Returns a {@link URL} for the file if it exists, null otherwise.
   */
  @VisibleForTesting
  private static URL getURL(String path) {
    Preconditions.checkNotNull(path);
    File file = new File(path);
    file = file.getAbsoluteFile();
    if (!file.exists()) {
      LOG.error("Unable to find specified file: " + path);
      return null;
    }
    try {
      return file.toURI().toURL();
    } catch (MalformedURLException ex) {
      LOG.error("Unable to construct URL for file: " + path, ex);
      return null;
    }
  }

  /**
   * Starts the RequestPoolService instance. It does the initial loading of the
   * configuration and starts the automatic reloading.
   */
  public void start() {
    Preconditions.checkState(!running_.get());
    allocLoader_.setReloadListener(allocationConf_::set);
    allocLoader_.start();
    try {
      allocLoader_.reloadAllocations();
    } catch (Exception ex) {
      try {
        stopInternal();
      } catch (Exception stopEx) {
        LOG.error("Unable to stop AllocationFileLoaderService after failed start.",
            stopEx);
      }
      throw new RuntimeException(ex);
    }
    if (confWatcher_ != null) confWatcher_.start();
    running_.set(true);
  }

  /**
   * Stops the RequestPoolService instance. Only used by tests.
   */
  public void stop() {
    Preconditions.checkState(running_.get());
    stopInternal();
  }

  /**
   * Stops the RequestPoolService instance without checking the running state. Only
   * called by stop() (which is only used in tests) or by start() if a failure occurs.
   * Should not be called more than once.
   */
  private void stopInternal() {
    running_.set(false);
    if (confWatcher_ != null) confWatcher_.stop();
    allocLoader_.stop();
  }

  /**
   * Resolves a user and pool to the pool specified by the allocation placement policy
   * and checks if the user is authorized to submit requests.
   *
   * @param thriftResolvePoolParams Serialized {@link TResolveRequestPoolParams}
   * @return serialized {@link TResolveRequestPoolResult}
   */
  @SuppressWarnings("unused") // called from C++
  public byte[] resolveRequestPool(byte[] thriftResolvePoolParams)
      throws ImpalaException {
    TResolveRequestPoolParams resolvePoolParams = new TResolveRequestPoolParams();
    JniUtil.deserializeThrift(protocolFactory_, resolvePoolParams,
        thriftResolvePoolParams);
    TResolveRequestPoolResult result = resolveRequestPool(resolvePoolParams);
    if (LOG.isTraceEnabled()) {
      LOG.trace("resolveRequestPool(pool={}, user={}): resolved_pool={}, has_access={}",
          resolvePoolParams.getRequested_pool(), resolvePoolParams.getUser(),
          result.resolved_pool, result.has_access);
    }
    try {
      return new TSerializer(protocolFactory_).serialize(result);
    } catch (TException e) {
      throw new InternalException(e.getMessage());
    }
  }

  @VisibleForTesting
  TResolveRequestPoolResult resolveRequestPool(
      TResolveRequestPoolParams resolvePoolParams) throws InternalException {
    String requestedPool = resolvePoolParams.getRequested_pool();
    String user = resolvePoolParams.getUser();
    TResolveRequestPoolResult result = new TResolveRequestPoolResult();
    String errorMessage = null;
    String pool = null;
    try {
      pool = assignToPool(requestedPool, user);
    } catch (IOException ex) {
      errorMessage = ex.getMessage();
      if (errorMessage.startsWith("No groups found for user")) {
        // The error thrown when using the 'primaryGroup' or 'secondaryGroup' rules and
        // the user does not exist are not helpful.
        errorMessage = String.format(
            "Failed to resolve user '%s' to a pool while evaluating the " +
            "'primaryGroup' or 'secondaryGroup' queue placement rules because no " +
            "groups were found for the user. This is likely because the user does not " +
            "exist on the local operating system.", resolvePoolParams.getUser());
      }
      LOG.warn(String.format("Error assigning to pool. requested='%s', user='%s', msg=%s",
          requestedPool, user, errorMessage), ex);
    }
    if (pool == null) {
      if (errorMessage == null) {
        // This occurs when assignToPool returns null (not an error), i.e. if the pool
        // cannot be resolved according to the policy.
        result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
      } else {
        // If Yarn throws an exception, return an error status.
        result.setStatus(
            new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(errorMessage)));
      }
    } else {
      result.setResolved_pool(pool);
      result.setHas_access(hasAccess(pool, user));
      result.setStatus(new TStatus(TErrorCode.OK, Lists.newArrayList()));
    }
    return result;
  }

  /**
   * Gets the pool configuration values for the specified pool.
   *
   * @param thriftPoolConfigParams Serialized {@link TPoolConfigParams}
   * @return serialized {@link TPoolConfig}
   */
  @SuppressWarnings("unused") // called from C++
  public byte[] getPoolConfig(byte[] thriftPoolConfigParams) throws ImpalaException {
    Preconditions.checkState(running_.get());
    TPoolConfigParams poolConfigParams = new TPoolConfigParams();
    JniUtil.deserializeThrift(protocolFactory_, poolConfigParams,
        thriftPoolConfigParams);
    TPoolConfig result = getPoolConfig(poolConfigParams.getPool());
    try {
      return new TSerializer(protocolFactory_).serialize(result);
    } catch (TException e) {
      throw new InternalException(e.getMessage());
    }
  }

  @VisibleForTesting
  TPoolConfig getPoolConfig(String pool) {
    TPoolConfig result = new TPoolConfig();
    long maxMemoryMb = allocationConf_.get().getMaxResources(pool).getMemory();
    result.setMax_mem_resources(
        maxMemoryMb == Integer.MAX_VALUE ? -1 : maxMemoryMb * ByteUnits.MEGABYTE);
    if (conf_ == null) {
      result.setMax_requests(MAX_PLACED_RESERVATIONS_DEFAULT);
      result.setMax_queued(MAX_QUEUED_RESERVATIONS_DEFAULT);
      result.setDefault_query_options("");
    } else {
      // Capture the current conf_ in case it changes while we're using it.
      Configuration currentConf = conf_;
      result.setMax_requests(getPoolConfigValue(currentConf, pool,
          MAX_PLACED_RESERVATIONS_KEY, MAX_PLACED_RESERVATIONS_DEFAULT));
      result.setMax_queued(getPoolConfigValue(currentConf, pool,
          MAX_QUEUED_RESERVATIONS_KEY, MAX_QUEUED_RESERVATIONS_DEFAULT));

      // Only return positive values. Admission control has a default from gflags.
      long queueTimeoutMs = getPoolConfigValue(currentConf, pool, QUEUE_TIMEOUT_KEY, -1L);
      if (queueTimeoutMs > 0) result.setQueue_timeout_ms(queueTimeoutMs);
      result.setDefault_query_options(
          getPoolConfigValue(currentConf, pool, QUERY_OPTIONS_KEY, ""));
      result.setMax_query_mem_limit(
          getPoolConfigValue(currentConf, pool, MAX_QUERY_MEM_LIMIT_BYTES, 0L));
      result.setMin_query_mem_limit(
          getPoolConfigValue(currentConf, pool, MIN_QUERY_MEM_LIMIT_BYTES, 0L));
      result.setClamp_mem_limit_query_option(
          getPoolConfigValue(currentConf, pool, CLAMP_MEM_LIMIT_QUERY_OPTION, true));
      result.setMax_running_queries_multiple(
          getPoolConfigDoubleValue(currentConf, pool, MAX_RUNNING_QUERIES_MULTIPLE, 0.0));
      result.setMax_queued_queries_multiple(
          getPoolConfigDoubleValue(currentConf, pool, MAX_QUEUED_QUERIES_MULTIPLE, 0.0));
      result.setMax_memory_multiple(
          getPoolConfigValue(currentConf, pool, MAX_MEMORY_MULTIPLE, 0));
    }
    if (LOG.isTraceEnabled()) {
      LOG.debug("getPoolConfig(pool={}): max_mem_resources={}, max_requests={},"
              + " max_queued={},  queue_timeout_ms={}, default_query_options={},"
              + " max_query_mem_limit={}, min_query_mem_limit={},"
              + " clamp_mem_limit_query_option={}, max_running_queries_multiple={},"
              + " max_queued_queries_multiple={}, max_memory_multiple={}",
          pool, result.max_mem_resources, result.max_requests, result.max_queued,
          result.queue_timeout_ms, result.default_query_options,
          result.max_query_mem_limit, result.min_query_mem_limit,
          result.clamp_mem_limit_query_option, result.max_running_queries_multiple,
          result.max_queued_queries_multiple, result.max_memory_multiple);
    }
    return result;
  }

  /**
   * Looks up the per-pool integer config from the Configuration. First checks for
   * a per-pool value, then a default set in the config, and lastly to the specified
   * 'defaultValue'.
   *
   * @param conf The Configuration to use, provided so the caller can ensure the same
   *        Configuration is used to look up multiple properties.
   */
  private long getPoolConfigValue(
      Configuration conf, String pool, String key, long defaultValue) {
    return conf.getLong(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool),
        conf.getLong(key, defaultValue));
  }

  /**
   * Looks up the per-pool String config from the Configuration. See above.
   */
  private String getPoolConfigValue(
      Configuration conf, String pool, String key, String defaultValue) {
    return conf.get(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool),
        conf.get(key, defaultValue));
  }

  /**
   * Looks up the per-pool Boolean config from the Configuration. See above.
   */
  private boolean getPoolConfigValue(
      Configuration conf, String pool, String key, boolean defaultValue) {
    return conf.getBoolean(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool),
        conf.getBoolean(key, defaultValue));
  }

  /**
   * Looks up the per-pool Double config from the Configuration. See above.
   */
  private double getPoolConfigDoubleValue(
      Configuration conf, String pool, String key, double defaultValue) {
    return conf.getDouble(String.format(PER_POOL_CONFIG_KEY_FORMAT, key, pool),
        conf.getDouble(key, defaultValue));
  }

  /**
   * Resolves the actual pool to use via the allocation placement policy. The policy may
   * change the requested pool.
   *
   * @param requestedPool The requested pool. May not be null, an empty string indicates
   * the policy should return the default pool for this user.
   * @param user The user, must not be null or empty.
   * @return the actual pool to use, null if a pool could not be resolved.
   */
  @VisibleForTesting
  String assignToPool(String requestedPool, String user)
      throws InternalException, IOException {
    Preconditions.checkState(running_.get());
    Preconditions.checkNotNull(requestedPool);
    Preconditions.checkArgument(!Strings.isNullOrEmpty(user));
    // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because
    // assignAppToQueue() will check group membership which should always be done on
    // the short name of the principal.
    String shortName = new User(user).getShortName();
    return allocationConf_.get().getPlacementPolicy().assignAppToQueue(
        requestedPool.isEmpty() ? YarnConfiguration.DEFAULT_QUEUE_NAME : requestedPool,
        shortName);
  }

  /**
   * Indicates if a user has access to the pool.
   *
   * @param pool the pool to check if the user has access to. NOTE: it should always be
   * called with a pool returned by the {@link #assignToPool(String, String)} method.
   * @param user the user to check if it has access to the pool.
   * @return True if the user has access to the pool.
   */
  @VisibleForTesting
  boolean hasAccess(String pool, String user) throws InternalException {
    Preconditions.checkState(running_.get());
    Preconditions.checkArgument(!Strings.isNullOrEmpty(pool));
    Preconditions.checkArgument(!Strings.isNullOrEmpty(user));
    // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because
    // the UserGroupInformation will check group membership which should always be done
    // on the short name of the principal.
    String shortName;
    User requestingUser = new User(user);
    shortName = requestingUser.getShortName();
    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(shortName);
    return allocationConf_.get().hasAccess(pool, QueueACL.SUBMIT_APPLICATIONS, ugi);
  }

  /**
   * Returns the AllocationConfiguration corresponding to this instance of
   * RequestPoolService.
   */
  @VisibleForTesting
  AllocationConfiguration getAllocationConfig() {
    Preconditions.checkState(RuntimeEnv.INSTANCE.isTestEnv());
    return allocationConf_.get();
  }
}
