// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "scheduling/request-pool-service.h"

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <list>
#include <string>
#include <gutil/strings/substitute.h>

#include "common/constant-strings.h"
#include "common/logging.h"
#include "rpc/jni-thrift-util.h"
#include "service/query-options.h"
#include "util/auth-util.h"
#include "util/collection-metrics.h"
#include "util/mem-info.h"
#include "util/parse-util.h"
#include "util/time.h"

#include "common/names.h"

using namespace impala;

DEFINE_bool(require_username, false, "Requires that a user be provided in order to "
    "schedule requests. If enabled and a user is not provided, requests will be "
    "rejected, otherwise requests without a username will be submitted with the "
    "username 'default'.");
static const string DEFAULT_USER = "default";

DEFINE_string(fair_scheduler_allocation_path, "", "Path to the fair scheduler "
    "allocation file (fair-scheduler.xml).");
// TODO: Rename / cleanup now that Llama is removed (see IMPALA-4159).
DEFINE_string(llama_site_path, "", "Path to the Llama configuration file "
    "(llama-site.xml). If set, fair_scheduler_allocation_path must also be set.");

// The default_pool parameters are used if fair scheduler allocation and Llama
// configuration files are not provided. The default values for this 'default pool'
// are the same as the default values for pools defined via the fair scheduler
// allocation file and Llama configurations.
// TODO: Remove?
DEFINE_int64(default_pool_max_requests, -1, "Maximum number of concurrent outstanding "
    "requests allowed to run before queueing incoming requests. A negative value "
    "indicates no limit. 0 indicates no requests will be admitted. Ignored if "
    "fair_scheduler_config_path and llama_site_path are set.");

static const string default_pool_mem_limit_help_msg = "Maximum amount of memory that all "
    "outstanding requests in this pool may use before new requests to this pool "
    "are queued. " + Substitute(MEM_UNITS_HELP_MSG, "the physical memory") + " "
    "Ignored if fair_scheduler_config_path and llama_site_path are set.";
DEFINE_string(default_pool_mem_limit, "", default_pool_mem_limit_help_msg.c_str());
DEFINE_int64(default_pool_max_queued, 200, "Maximum number of requests allowed to be "
    "queued before rejecting requests. A negative value or 0 indicates requests "
    "will always be rejected once the maximum number of concurrent requests are "
    "executing. Ignored if fair_scheduler_config_path and "
    "llama_site_path are set.");

// Flags to disable the pool limits for all pools.
// TODO: Remove?
DEFINE_bool(disable_pool_mem_limits, false, "Disables all per-pool mem limits.");
DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on the "
    "maximum number of running requests.");


// Pool name used when the configuration files are not specified.
static const string DEFAULT_POOL_NAME = "default-pool";

static const string RESOLVE_POOL_METRIC_NAME = "request-pool-service.resolve-pool-duration-ms";

static const string ERROR_USER_TO_POOL_MAPPING_NOT_FOUND =
    "No mapping found for request from user '$0' with requested pool '$1'";
static const string ERROR_USER_NOT_ALLOWED_IN_POOL = "Request from user '$0' with "
    "requested pool '$1' denied access to assigned pool '$2'";
static const string ERROR_USER_NOT_SPECIFIED = "User must be specified because "
    "-require_username=true.";

RequestPoolService::RequestPoolService(MetricGroup* metrics) :
    resolve_pool_ms_metric_(NULL) {
  DCHECK(metrics != NULL);
  resolve_pool_ms_metric_ =
      StatsMetric<double>::CreateAndRegister(metrics, RESOLVE_POOL_METRIC_NAME);

  if (FLAGS_fair_scheduler_allocation_path.empty()) {
    default_pool_only_ = true;
    bool is_percent; // not used
    int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_default_pool_mem_limit,
        &is_percent, MemInfo::physical_mem());
    // -1 indicates an error occurred
    if (bytes_limit < 0) {
      CLEAN_EXIT_WITH_ERROR(Substitute("Unable to parse default pool mem limit from "
          "'$0'.", FLAGS_default_pool_mem_limit));
    }
    // 0 indicates no limit or not set
    if (bytes_limit == 0) {
      default_pool_mem_limit_ = -1;
    } else {
      default_pool_mem_limit_ = bytes_limit;
    }
    return;
  }
  default_pool_only_ = false;

  jmethodID start_id; // RequestPoolService.start(), only called in this method.
  JniMethodDescriptor methods[] = {
    {"<init>", "(Ljava/lang/String;Ljava/lang/String;)V", &ctor_},
    {"start", "()V", &start_id},
    {"resolveRequestPool", "([B)[B", &resolve_request_pool_id_},
    {"getPoolConfig", "([B)[B", &get_pool_config_id_}};

  JNIEnv* jni_env = JniUtil::GetJNIEnv();
  request_pool_service_class_ =
    jni_env->FindClass("org/apache/impala/util/RequestPoolService");
  ABORT_IF_EXC(jni_env);
  uint32_t num_methods = sizeof(methods) / sizeof(methods[0]);
  for (int i = 0; i < num_methods; ++i) {
    ABORT_IF_ERROR(JniUtil::LoadJniMethod(jni_env, request_pool_service_class_,
        &(methods[i])));
  }

  jstring fair_scheduler_config_path =
      jni_env->NewStringUTF(FLAGS_fair_scheduler_allocation_path.c_str());
  ABORT_IF_EXC(jni_env);
  jstring llama_site_path =
      jni_env->NewStringUTF(FLAGS_llama_site_path.c_str());
  ABORT_IF_EXC(jni_env);

  jobject request_pool_service = jni_env->NewObject(request_pool_service_class_, ctor_,
      fair_scheduler_config_path, llama_site_path);
  ABORT_IF_EXC(jni_env);
  ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, request_pool_service,
      &request_pool_service_));
  jni_env->CallObjectMethod(request_pool_service_, start_id);
  ABORT_IF_EXC(jni_env);
}

Status RequestPoolService::ResolveRequestPool(const TQueryCtx& ctx,
    string* resolved_pool) {
  if (default_pool_only_) {
    *resolved_pool = DEFAULT_POOL_NAME;
    return Status::OK();
  }
  string user = GetEffectiveUser(ctx.session);
  if (user.empty()) {
    if (FLAGS_require_username) return Status(ERROR_USER_NOT_SPECIFIED);
    // Fall back to a 'default' user if not set so that queries can still run.
    VLOG_RPC << "No user specified: using user=default";
    user = DEFAULT_USER;
  }

  const string& requested_pool = ctx.client_request.query_options.request_pool;
  TResolveRequestPoolParams params;
  params.__set_user(user);
  params.__set_requested_pool(requested_pool);
  TResolveRequestPoolResult result;
  int64_t start_time = MonotonicMillis();
  Status status = JniUtil::CallJniMethod(request_pool_service_, resolve_request_pool_id_,
      params, &result);
  resolve_pool_ms_metric_->Update(MonotonicMillis() - start_time);

  if (result.status.status_code != TErrorCode::OK) {
    return Status(boost::algorithm::join(result.status.error_msgs, "; "));
  }
  if (result.resolved_pool.empty()) {
    return Status(Substitute(ERROR_USER_TO_POOL_MAPPING_NOT_FOUND,
        user, requested_pool));
  }
  if (!result.has_access) {
    return Status(Substitute(ERROR_USER_NOT_ALLOWED_IN_POOL, user,
        requested_pool, result.resolved_pool));
  }
  *resolved_pool = result.resolved_pool;
  return Status::OK();
}

Status RequestPoolService::GetPoolConfig(const string& pool_name,
    TPoolConfig* pool_config) {
  if (default_pool_only_) {
    pool_config->__set_max_requests(
        FLAGS_disable_pool_max_requests ? -1 : FLAGS_default_pool_max_requests);
    pool_config->__set_max_mem_resources(
        FLAGS_disable_pool_mem_limits ? -1 : default_pool_mem_limit_);
    pool_config->__set_max_queued(FLAGS_default_pool_max_queued);
    pool_config->__set_default_query_options("");
    pool_config->__set_min_query_mem_limit(0);
    pool_config->__set_max_query_mem_limit(0);
    pool_config->__set_clamp_mem_limit_query_option(true);
    return Status::OK();
  }

  TPoolConfigParams params;
  params.__set_pool(pool_name);
  RETURN_IF_ERROR(JniUtil::CallJniMethod(
        request_pool_service_, get_pool_config_id_, params, pool_config));
  if (FLAGS_disable_pool_max_requests) pool_config->__set_max_requests(-1);
  if (FLAGS_disable_pool_mem_limits) pool_config->__set_max_mem_resources(-1);
  return Status::OK();
}
