// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/catalog-op-executor.h"

#include <sstream>

#include "common/status.h"
#include "catalog/catalog-service-client-wrapper.h"
#include "exec/incr-stats-util.h"
#include "runtime/lib-cache.h"
#include "runtime/client-cache-types.h"
#include "runtime/exec-env.h"
#include "service/frontend.h"
#include "service/impala-server.h"
#include "service/hs2-util.h"
#include "util/runtime-profile-counters.h"
#include "util/string-parser.h"
#include "util/test-info.h"
#include "util/time.h"
#include "gen-cpp/CatalogService.h"
#include "gen-cpp/CatalogService_types.h"
#include "gen-cpp/CatalogObjects_types.h"

#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/Thrift.h>
#include <gutil/strings/substitute.h>

#include "common/names.h"
using namespace impala;
using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;

DECLARE_bool(use_local_catalog);
DECLARE_int32(catalog_service_port);
DECLARE_string(catalog_service_host);
DECLARE_string(debug_actions);

DECLARE_int32(catalog_client_connection_num_retries);
DECLARE_int32(catalog_client_rpc_timeout_ms);
DECLARE_int32(catalog_client_rpc_retry_interval_ms);

DEFINE_int32_hidden(inject_latency_after_catalog_fetch_ms, 0,
    "Latency (ms) to be injected after fetching catalog data from the catalogd");

/// Used purely for debug actions. The DEBUG_ACTION is only executed on the first RPC
/// attempt.
static Status CatalogRpcDebugFn(int* attempt) {
  return (*attempt)++ == 0 ?
      DebugAction(FLAGS_debug_actions, "CATALOG_RPC_FIRST_ATTEMPT") :
      Status::OK();
}

Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
  Status status;
  DCHECK(profile_ != NULL);
  RuntimeProfile::Counter* exec_timer = ADD_TIMER(profile_, "CatalogOpExecTimer");
  SCOPED_TIMER(exec_timer);
  const TNetworkAddress& address =
      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
  RETURN_IF_ERROR(status);
  switch (request.op_type) {
    case TCatalogOpType::DDL: {
      // Compute stats stmts must be executed via ExecComputeStats().
      DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS);

      exec_response_.reset(new TDdlExecResponse());
      int attempt = 0; // Used for debug action only.
      CatalogServiceConnection::RpcStatus rpc_status =
          CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
              &CatalogServiceClientWrapper::ExecDdl, request.ddl_params,
              FLAGS_catalog_client_connection_num_retries,
              FLAGS_catalog_client_rpc_retry_interval_ms,
              [&attempt]() { return CatalogRpcDebugFn(&attempt); }, exec_response_.get());
      RETURN_IF_ERROR(rpc_status.status);
      catalog_update_result_.reset(
          new TCatalogUpdateResult(exec_response_.get()->result));
      Status status(exec_response_->result.status);
      if (status.ok()) {
        if (request.ddl_params.ddl_type == TDdlType::DROP_FUNCTION) {
          HandleDropFunction(request.ddl_params.drop_fn_params);
        } else if (request.ddl_params.ddl_type == TDdlType::DROP_DATA_SOURCE) {
          HandleDropDataSource(request.ddl_params.drop_data_source_params);
        }
      }
      return status;
    }
    case TCatalogOpType::RESET_METADATA: {
      TResetMetadataResponse response;
      int attempt = 0; // Used for debug action only.
      CatalogServiceConnection::RpcStatus rpc_status =
          CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
              &CatalogServiceClientWrapper::ResetMetadata, request.reset_metadata_params,
              FLAGS_catalog_client_connection_num_retries,
              FLAGS_catalog_client_rpc_retry_interval_ms,
              [&attempt]() { return CatalogRpcDebugFn(&attempt); }, &response);
      RETURN_IF_ERROR(rpc_status.status);
      catalog_update_result_.reset(new TCatalogUpdateResult(response.result));
      return Status(response.result.status);
    }
    default: {
      return Status(Substitute("TCatalogOpType: $0 does not support execution against the"
          " CatalogService", request.op_type));
    }
  }
}

Status CatalogOpExecutor::ExecComputeStats(
    const TCatalogOpRequest& compute_stats_request,
    const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data,
    const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) {
  // Create a new DDL request to alter the table's statistics.
  TCatalogOpRequest catalog_op_req;
  catalog_op_req.__isset.ddl_params = true;
  catalog_op_req.__set_op_type(TCatalogOpType::DDL);
  catalog_op_req.__set_sync_ddl(compute_stats_request.sync_ddl);
  TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
  update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
  update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);

  const TComputeStatsParams& compute_stats_params =
      compute_stats_request.ddl_params.compute_stats_params;
  TAlterTableUpdateStatsParams& update_stats_params =
      update_stats_req.alter_table_params.update_stats_params;
  update_stats_req.__isset.alter_table_params = true;
  update_stats_req.alter_table_params.__set_alter_type(TAlterTableType::UPDATE_STATS);
  update_stats_req.alter_table_params.__set_table_name(compute_stats_params.table_name);
  update_stats_req.alter_table_params.__isset.update_stats_params = true;
  update_stats_params.__set_table_name(compute_stats_params.table_name);
  update_stats_params.__set_expect_all_partitions(
      compute_stats_params.expect_all_partitions);
  update_stats_params.__set_is_incremental(compute_stats_params.is_incremental);

  // Fill the alteration request based on the child-query results.
  SetTableStats(tbl_stats_schema, tbl_stats_data,
      compute_stats_params.existing_part_stats, &update_stats_params);
  if (compute_stats_params.__isset.total_file_bytes) {
    update_stats_params.table_stats.__set_total_file_bytes(
        compute_stats_params.total_file_bytes);
  }
  // col_stats_schema and col_stats_data will be empty if there was no column stats query.
  if (!col_stats_schema.columns.empty()) {
    if (compute_stats_params.is_incremental) {
      RuntimeProfile::Counter* incremental_finalize_timer =
          ADD_TIMER(profile_, "FinalizeIncrementalStatsTimer");
      SCOPED_TIMER(incremental_finalize_timer);
      FinalizePartitionedColumnStats(col_stats_schema,
          compute_stats_params.existing_part_stats,
          compute_stats_params.expected_partitions,
          col_stats_data, compute_stats_params.num_partition_cols, &update_stats_params);
    } else {
      SetColumnStats(col_stats_schema, col_stats_data, &update_stats_params);
    }
  }

  // Execute the 'alter table update stats' request.
  RETURN_IF_ERROR(Exec(catalog_op_req));
  return Status::OK();
}

void CatalogOpExecutor::HandleDropFunction(const TDropFunctionParams& request) {
  DCHECK(fe_ != NULL) << "FE tests should not be calling this";
  // Can only be called after successfully processing a catalog update.
  DCHECK(catalog_update_result_ != NULL);

  // Lookup in the local catalog the metadata for the function.
  TCatalogObject obj;
  obj.type = TCatalogObjectType::FUNCTION;
  obj.fn.name = request.fn_name;
  obj.fn.arg_types = request.arg_types;
  obj.fn.signature = request.signature;
  obj.__isset.fn = true;
  obj.fn.__isset.signature = true;

  TCatalogObject fn;
  Status status = fe_->GetCatalogObject(obj, &fn);
  if (!status.ok()) {
    // This can happen if the function was dropped by another impalad.
    VLOG_QUERY << "Could not lookup function catalog object: "
               << apache::thrift::ThriftDebugString(request);
    return;
  }
  // This function may have been dropped and re-created. To avoid removing the re-created
  // function's entry from the cache verify the existing function has a catalog
  // version <= the dropped version. This may happen if the update from the statestore
  // gets applied *before* the result of a direct-DDL drop function command.
  if (fn.catalog_version <= catalog_update_result_->version) {
    LibCache::instance()->RemoveEntry(fn.fn.hdfs_location);
  }
}

void CatalogOpExecutor::HandleDropDataSource(const TDropDataSourceParams& request) {
  DCHECK(fe_ != NULL) << "FE tests should not be calling this";
  // Can only be called after successfully processing a catalog update.
  DCHECK(catalog_update_result_ != NULL);

  // Lookup in the local catalog the metadata for the data source.
  TCatalogObject obj;
  obj.type = TCatalogObjectType::DATA_SOURCE;
  obj.data_source.name = request.data_source;
  obj.__isset.data_source = true;

  TCatalogObject ds;
  Status status = fe_->GetCatalogObject(obj, &ds);
  if (!status.ok()) {
    // This can happen if the data source was dropped by another impalad.
    VLOG_QUERY << "Could not lookup data source catalog object: "
               << apache::thrift::ThriftDebugString(request);
    return;
  }
  // This data source may have been dropped and re-created. To avoid removing the
  // re-created data source's entry from the cache verify the existing data source has a
  // catalog version <= the dropped version. This may happen if the update from the
  // statestore gets applied *before* the result of a direct-DDL drop data source
  // command.
  if (ds.catalog_version <= catalog_update_result_->version) {
    LibCache::instance()->RemoveEntry(ds.data_source.hdfs_location);
  }
}

void CatalogOpExecutor::SetTableStats(const TTableSchema& tbl_stats_schema,
    const TRowSet& tbl_stats_data, const vector<TPartitionStats>& existing_part_stats,
    TAlterTableUpdateStatsParams* params) {
  if (tbl_stats_data.rows.size() == 1 && tbl_stats_data.rows[0].colVals.size() == 1) {
    // Unpartitioned table. Only set table stats, but no partition stats.
    // The first column is the COUNT(*) expr of the original query.
    params->table_stats.__set_num_rows(tbl_stats_data.rows[0].colVals[0].i64Val.value);
    params->__isset.table_stats = true;
    return;
  }

  // Accumulate total number of rows in the partitioned table.
  long total_num_rows = 0;
  // Set per-partition stats.
  for (const TRow& row: tbl_stats_data.rows) {
    DCHECK_GT(row.colVals.size(), 0);
    // The first column is the COUNT(*) expr of the original query.
    DCHECK(row.colVals[0].__isset.i64Val);
    int64_t num_rows = row.colVals[0].i64Val.value;
    // The remaining columns are partition columns that the results are grouped by.
    vector<string> partition_key_vals;
    partition_key_vals.reserve(row.colVals.size());
    for (int j = 1; j < row.colVals.size(); ++j) {
      stringstream ss;
      PrintTColumnValue(row.colVals[j], &ss);
      partition_key_vals.push_back(ss.str());
    }
    params->partition_stats[partition_key_vals].stats.__set_num_rows(num_rows);
    total_num_rows += num_rows;
  }
  params->__isset.partition_stats = true;

  // Add row counts of existing partitions that are not going to be modified.
  for (const TPartitionStats& existing_stats: existing_part_stats) {
    total_num_rows += existing_stats.stats.num_rows;
  }

  // Set per-table stats.
  params->table_stats.__set_num_rows(total_num_rows);
  params->__isset.table_stats = true;
}

void CatalogOpExecutor::SetColumnStats(const TTableSchema& col_stats_schema,
    const TRowSet& col_stats_data, TAlterTableUpdateStatsParams* params) {
  // Expect exactly one result row.
  DCHECK_EQ(1, col_stats_data.rows.size());
  const TRow& col_stats_row = col_stats_data.rows[0];

  // Set per-column stats. For a column at position i in its source table,
  // the NDVs and the number of NULLs are at position i and i + 1 of the
  // col_stats_row, respectively. Positions i + 2 and i + 3 contain the max/avg
  // length for string columns, and -1 for non-string columns.
  for (int i = 0; i < col_stats_row.colVals.size(); i += 4) {
    TColumnStats col_stats;
    col_stats.__set_num_distinct_values(col_stats_row.colVals[i].i64Val.value);
    col_stats.__set_num_nulls(col_stats_row.colVals[i + 1].i64Val.value);
    col_stats.__set_max_size(col_stats_row.colVals[i + 2].i32Val.value);
    col_stats.__set_avg_size(col_stats_row.colVals[i + 3].doubleVal.value);
    params->column_stats[col_stats_schema.columns[i].columnName] = col_stats;
  }
  params->__isset.column_stats = true;
}

Status CatalogOpExecutor::GetCatalogObject(const TCatalogObject& object_desc,
    TCatalogObject* result) {
  const TNetworkAddress& address =
      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
  TGetCatalogObjectRequest request;
  request.__set_object_desc(object_desc);

  TGetCatalogObjectResponse response;
  int attempt = 0; // Used for debug action only.
  CatalogServiceConnection::RpcStatus rpc_status =
      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
          &CatalogServiceClientWrapper::GetCatalogObject, request,
          FLAGS_catalog_client_connection_num_retries,
          FLAGS_catalog_client_rpc_retry_interval_ms,
          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, &response);
  RETURN_IF_ERROR(rpc_status.status);
  *result = response.catalog_object;
  return Status::OK();
}

Status CatalogOpExecutor::GetPartialCatalogObject(
    const TGetPartialCatalogObjectRequest& req,
    TGetPartialCatalogObjectResponse* resp) {
  DCHECK(FLAGS_use_local_catalog || TestInfo::is_test());
  const TNetworkAddress& address =
      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
  int attempt = 0; // Used for debug action only.
  CatalogServiceConnection::RpcStatus rpc_status =
      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
          &CatalogServiceClientWrapper::GetPartialCatalogObject, req,
          FLAGS_catalog_client_connection_num_retries,
          FLAGS_catalog_client_rpc_retry_interval_ms,
          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, resp);
  RETURN_IF_ERROR(rpc_status.status);
  if (FLAGS_inject_latency_after_catalog_fetch_ms > 0) {
    SleepForMs(FLAGS_inject_latency_after_catalog_fetch_ms);
  }
  return Status::OK();
}


Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req,
    TPrioritizeLoadResponse* result) {
  const TNetworkAddress& address =
      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
  int attempt = 0; // Used for debug action only.
  CatalogServiceConnection::RpcStatus rpc_status =
      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
          &CatalogServiceClientWrapper::PrioritizeLoad, req,
          FLAGS_catalog_client_connection_num_retries,
          FLAGS_catalog_client_rpc_retry_interval_ms,
          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
  RETURN_IF_ERROR(rpc_status.status);
  return Status::OK();
}

Status CatalogOpExecutor::GetPartitionStats(
    const TGetPartitionStatsRequest& req, TGetPartitionStatsResponse* result) {
  const TNetworkAddress& address =
      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
  int attempt = 0; // Used for debug action only.
  CatalogServiceConnection::RpcStatus rpc_status =
      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
          &CatalogServiceClientWrapper::GetPartitionStats, req,
          FLAGS_catalog_client_connection_num_retries,
          FLAGS_catalog_client_rpc_retry_interval_ms,
          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
  RETURN_IF_ERROR(rpc_status.status);
  return Status::OK();
}

Status CatalogOpExecutor::SentryAdminCheck(const TSentryAdminCheckRequest& req,
    TSentryAdminCheckResponse* result) {
  const TNetworkAddress& address =
      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
  int attempt = 0; // Used for debug action only.
  CatalogServiceConnection::RpcStatus rpc_status =
      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
          &CatalogServiceClientWrapper::SentryAdminCheck, req,
          FLAGS_catalog_client_connection_num_retries,
          FLAGS_catalog_client_rpc_retry_interval_ms,
          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result);
  RETURN_IF_ERROR(rpc_status.status);
  return Status::OK();
}

Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req,
  TUpdateTableUsageResponse* resp) {
  const TNetworkAddress& address =
      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
  int attempt = 0; // Used for debug action only.
  CatalogServiceConnection::RpcStatus rpc_status =
      CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address,
          &CatalogServiceClientWrapper::UpdateTableUsage, req,
          FLAGS_catalog_client_connection_num_retries,
          FLAGS_catalog_client_rpc_retry_interval_ms,
          [&attempt]() { return CatalogRpcDebugFn(&attempt); }, resp);
  RETURN_IF_ERROR(rpc_status.status);
  return Status::OK();
}
