| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/catalog-op-executor.h" |
| |
| #include <sstream> |
| |
| #include "common/status.h" |
| #include "catalog/catalog-service-client-wrapper.h" |
| #include "exec/incr-stats-util.h" |
| #include "runtime/lib-cache.h" |
| #include "runtime/client-cache-types.h" |
| #include "runtime/exec-env.h" |
| #include "service/frontend.h" |
| #include "service/impala-server.h" |
| #include "service/hs2-util.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/string-parser.h" |
| #include "util/test-info.h" |
| #include "util/time.h" |
| #include "gen-cpp/CatalogService.h" |
| #include "gen-cpp/CatalogService_types.h" |
| #include "gen-cpp/CatalogObjects_types.h" |
| |
| #include <thrift/protocol/TDebugProtocol.h> |
| #include <thrift/Thrift.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "common/names.h" |
| using namespace impala; |
| using namespace apache::hive::service::cli::thrift; |
| using namespace apache::thrift; |
| |
| DECLARE_bool(use_local_catalog); |
| DECLARE_int32(catalog_service_port); |
| DECLARE_string(catalog_service_host); |
| DECLARE_string(debug_actions); |
| |
| DECLARE_int32(catalog_client_connection_num_retries); |
| DECLARE_int32(catalog_client_rpc_timeout_ms); |
| DECLARE_int32(catalog_client_rpc_retry_interval_ms); |
| |
| DEFINE_int32_hidden(inject_latency_after_catalog_fetch_ms, 0, |
| "Latency (ms) to be injected after fetching catalog data from the catalogd"); |
| |
| /// Used purely for debug actions. The DEBUG_ACTION is only executed on the first RPC |
| /// attempt. |
| static Status CatalogRpcDebugFn(int* attempt) { |
| return (*attempt)++ == 0 ? |
| DebugAction(FLAGS_debug_actions, "CATALOG_RPC_FIRST_ATTEMPT") : |
| Status::OK(); |
| } |
| |
| Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) { |
| Status status; |
| DCHECK(profile_ != NULL); |
| RuntimeProfile::Counter* exec_timer = ADD_TIMER(profile_, "CatalogOpExecTimer"); |
| SCOPED_TIMER(exec_timer); |
| const TNetworkAddress& address = |
| MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); |
| RETURN_IF_ERROR(status); |
| switch (request.op_type) { |
| case TCatalogOpType::DDL: { |
| // Compute stats stmts must be executed via ExecComputeStats(). |
| DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS); |
| |
| exec_response_.reset(new TDdlExecResponse()); |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::ExecDdl, request.ddl_params, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, exec_response_.get()); |
| RETURN_IF_ERROR(rpc_status.status); |
| catalog_update_result_.reset( |
| new TCatalogUpdateResult(exec_response_.get()->result)); |
| Status status(exec_response_->result.status); |
| if (status.ok()) { |
| if (request.ddl_params.ddl_type == TDdlType::DROP_FUNCTION) { |
| HandleDropFunction(request.ddl_params.drop_fn_params); |
| } else if (request.ddl_params.ddl_type == TDdlType::DROP_DATA_SOURCE) { |
| HandleDropDataSource(request.ddl_params.drop_data_source_params); |
| } |
| } |
| return status; |
| } |
| case TCatalogOpType::RESET_METADATA: { |
| TResetMetadataResponse response; |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::ResetMetadata, request.reset_metadata_params, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, &response); |
| RETURN_IF_ERROR(rpc_status.status); |
| catalog_update_result_.reset(new TCatalogUpdateResult(response.result)); |
| return Status(response.result.status); |
| } |
| default: { |
| return Status(Substitute("TCatalogOpType: $0 does not support execution against the" |
| " CatalogService", request.op_type)); |
| } |
| } |
| } |
| |
| Status CatalogOpExecutor::ExecComputeStats( |
| const TCatalogOpRequest& compute_stats_request, |
| const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data, |
| const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) { |
| // Create a new DDL request to alter the table's statistics. |
| TCatalogOpRequest catalog_op_req; |
| catalog_op_req.__isset.ddl_params = true; |
| catalog_op_req.__set_op_type(TCatalogOpType::DDL); |
| catalog_op_req.__set_sync_ddl(compute_stats_request.sync_ddl); |
| TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params; |
| update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE); |
| update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl); |
| |
| const TComputeStatsParams& compute_stats_params = |
| compute_stats_request.ddl_params.compute_stats_params; |
| TAlterTableUpdateStatsParams& update_stats_params = |
| update_stats_req.alter_table_params.update_stats_params; |
| update_stats_req.__isset.alter_table_params = true; |
| update_stats_req.alter_table_params.__set_alter_type(TAlterTableType::UPDATE_STATS); |
| update_stats_req.alter_table_params.__set_table_name(compute_stats_params.table_name); |
| update_stats_req.alter_table_params.__isset.update_stats_params = true; |
| update_stats_params.__set_table_name(compute_stats_params.table_name); |
| update_stats_params.__set_expect_all_partitions( |
| compute_stats_params.expect_all_partitions); |
| update_stats_params.__set_is_incremental(compute_stats_params.is_incremental); |
| |
| // Fill the alteration request based on the child-query results. |
| SetTableStats(tbl_stats_schema, tbl_stats_data, |
| compute_stats_params.existing_part_stats, &update_stats_params); |
| if (compute_stats_params.__isset.total_file_bytes) { |
| update_stats_params.table_stats.__set_total_file_bytes( |
| compute_stats_params.total_file_bytes); |
| } |
| // col_stats_schema and col_stats_data will be empty if there was no column stats query. |
| if (!col_stats_schema.columns.empty()) { |
| if (compute_stats_params.is_incremental) { |
| RuntimeProfile::Counter* incremental_finalize_timer = |
| ADD_TIMER(profile_, "FinalizeIncrementalStatsTimer"); |
| SCOPED_TIMER(incremental_finalize_timer); |
| FinalizePartitionedColumnStats(col_stats_schema, |
| compute_stats_params.existing_part_stats, |
| compute_stats_params.expected_partitions, |
| col_stats_data, compute_stats_params.num_partition_cols, &update_stats_params); |
| } else { |
| SetColumnStats(col_stats_schema, col_stats_data, &update_stats_params); |
| } |
| } |
| |
| // Execute the 'alter table update stats' request. |
| RETURN_IF_ERROR(Exec(catalog_op_req)); |
| return Status::OK(); |
| } |
| |
| void CatalogOpExecutor::HandleDropFunction(const TDropFunctionParams& request) { |
| DCHECK(fe_ != NULL) << "FE tests should not be calling this"; |
| // Can only be called after successfully processing a catalog update. |
| DCHECK(catalog_update_result_ != NULL); |
| |
| // Lookup in the local catalog the metadata for the function. |
| TCatalogObject obj; |
| obj.type = TCatalogObjectType::FUNCTION; |
| obj.fn.name = request.fn_name; |
| obj.fn.arg_types = request.arg_types; |
| obj.fn.signature = request.signature; |
| obj.__isset.fn = true; |
| obj.fn.__isset.signature = true; |
| |
| TCatalogObject fn; |
| Status status = fe_->GetCatalogObject(obj, &fn); |
| if (!status.ok()) { |
| // This can happen if the function was dropped by another impalad. |
| VLOG_QUERY << "Could not lookup function catalog object: " |
| << apache::thrift::ThriftDebugString(request); |
| return; |
| } |
| // This function may have been dropped and re-created. To avoid removing the re-created |
| // function's entry from the cache verify the existing function has a catalog |
| // version <= the dropped version. This may happen if the update from the statestore |
| // gets applied *before* the result of a direct-DDL drop function command. |
| if (fn.catalog_version <= catalog_update_result_->version) { |
| LibCache::instance()->RemoveEntry(fn.fn.hdfs_location); |
| } |
| } |
| |
| void CatalogOpExecutor::HandleDropDataSource(const TDropDataSourceParams& request) { |
| DCHECK(fe_ != NULL) << "FE tests should not be calling this"; |
| // Can only be called after successfully processing a catalog update. |
| DCHECK(catalog_update_result_ != NULL); |
| |
| // Lookup in the local catalog the metadata for the data source. |
| TCatalogObject obj; |
| obj.type = TCatalogObjectType::DATA_SOURCE; |
| obj.data_source.name = request.data_source; |
| obj.__isset.data_source = true; |
| |
| TCatalogObject ds; |
| Status status = fe_->GetCatalogObject(obj, &ds); |
| if (!status.ok()) { |
| // This can happen if the data source was dropped by another impalad. |
| VLOG_QUERY << "Could not lookup data source catalog object: " |
| << apache::thrift::ThriftDebugString(request); |
| return; |
| } |
| // This data source may have been dropped and re-created. To avoid removing the |
| // re-created data source's entry from the cache verify the existing data source has a |
| // catalog version <= the dropped version. This may happen if the update from the |
| // statestore gets applied *before* the result of a direct-DDL drop data source |
| // command. |
| if (ds.catalog_version <= catalog_update_result_->version) { |
| LibCache::instance()->RemoveEntry(ds.data_source.hdfs_location); |
| } |
| } |
| |
| void CatalogOpExecutor::SetTableStats(const TTableSchema& tbl_stats_schema, |
| const TRowSet& tbl_stats_data, const vector<TPartitionStats>& existing_part_stats, |
| TAlterTableUpdateStatsParams* params) { |
| if (tbl_stats_data.rows.size() == 1 && tbl_stats_data.rows[0].colVals.size() == 1) { |
| // Unpartitioned table. Only set table stats, but no partition stats. |
| // The first column is the COUNT(*) expr of the original query. |
| params->table_stats.__set_num_rows(tbl_stats_data.rows[0].colVals[0].i64Val.value); |
| params->__isset.table_stats = true; |
| return; |
| } |
| |
| // Accumulate total number of rows in the partitioned table. |
| long total_num_rows = 0; |
| // Set per-partition stats. |
| for (const TRow& row: tbl_stats_data.rows) { |
| DCHECK_GT(row.colVals.size(), 0); |
| // The first column is the COUNT(*) expr of the original query. |
| DCHECK(row.colVals[0].__isset.i64Val); |
| int64_t num_rows = row.colVals[0].i64Val.value; |
| // The remaining columns are partition columns that the results are grouped by. |
| vector<string> partition_key_vals; |
| partition_key_vals.reserve(row.colVals.size()); |
| for (int j = 1; j < row.colVals.size(); ++j) { |
| stringstream ss; |
| PrintTColumnValue(row.colVals[j], &ss); |
| partition_key_vals.push_back(ss.str()); |
| } |
| params->partition_stats[partition_key_vals].stats.__set_num_rows(num_rows); |
| total_num_rows += num_rows; |
| } |
| params->__isset.partition_stats = true; |
| |
| // Add row counts of existing partitions that are not going to be modified. |
| for (const TPartitionStats& existing_stats: existing_part_stats) { |
| total_num_rows += existing_stats.stats.num_rows; |
| } |
| |
| // Set per-table stats. |
| params->table_stats.__set_num_rows(total_num_rows); |
| params->__isset.table_stats = true; |
| } |
| |
| void CatalogOpExecutor::SetColumnStats(const TTableSchema& col_stats_schema, |
| const TRowSet& col_stats_data, TAlterTableUpdateStatsParams* params) { |
| // Expect exactly one result row. |
| DCHECK_EQ(1, col_stats_data.rows.size()); |
| const TRow& col_stats_row = col_stats_data.rows[0]; |
| |
| // Set per-column stats. For a column at position i in its source table, |
| // the NDVs and the number of NULLs are at position i and i + 1 of the |
| // col_stats_row, respectively. Positions i + 2 and i + 3 contain the max/avg |
| // length for string columns, and -1 for non-string columns. |
| for (int i = 0; i < col_stats_row.colVals.size(); i += 4) { |
| TColumnStats col_stats; |
| col_stats.__set_num_distinct_values(col_stats_row.colVals[i].i64Val.value); |
| col_stats.__set_num_nulls(col_stats_row.colVals[i + 1].i64Val.value); |
| col_stats.__set_max_size(col_stats_row.colVals[i + 2].i32Val.value); |
| col_stats.__set_avg_size(col_stats_row.colVals[i + 3].doubleVal.value); |
| params->column_stats[col_stats_schema.columns[i].columnName] = col_stats; |
| } |
| params->__isset.column_stats = true; |
| } |
| |
| Status CatalogOpExecutor::GetCatalogObject(const TCatalogObject& object_desc, |
| TCatalogObject* result) { |
| const TNetworkAddress& address = |
| MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); |
| TGetCatalogObjectRequest request; |
| request.__set_object_desc(object_desc); |
| |
| TGetCatalogObjectResponse response; |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::GetCatalogObject, request, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, &response); |
| RETURN_IF_ERROR(rpc_status.status); |
| *result = response.catalog_object; |
| return Status::OK(); |
| } |
| |
| Status CatalogOpExecutor::GetPartialCatalogObject( |
| const TGetPartialCatalogObjectRequest& req, |
| TGetPartialCatalogObjectResponse* resp) { |
| DCHECK(FLAGS_use_local_catalog || TestInfo::is_test()); |
| const TNetworkAddress& address = |
| MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::GetPartialCatalogObject, req, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, resp); |
| RETURN_IF_ERROR(rpc_status.status); |
| if (FLAGS_inject_latency_after_catalog_fetch_ms > 0) { |
| SleepForMs(FLAGS_inject_latency_after_catalog_fetch_ms); |
| } |
| return Status::OK(); |
| } |
| |
| |
| Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req, |
| TPrioritizeLoadResponse* result) { |
| const TNetworkAddress& address = |
| MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::PrioritizeLoad, req, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result); |
| RETURN_IF_ERROR(rpc_status.status); |
| return Status::OK(); |
| } |
| |
| Status CatalogOpExecutor::GetPartitionStats( |
| const TGetPartitionStatsRequest& req, TGetPartitionStatsResponse* result) { |
| const TNetworkAddress& address = |
| MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::GetPartitionStats, req, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result); |
| RETURN_IF_ERROR(rpc_status.status); |
| return Status::OK(); |
| } |
| |
| Status CatalogOpExecutor::SentryAdminCheck(const TSentryAdminCheckRequest& req, |
| TSentryAdminCheckResponse* result) { |
| const TNetworkAddress& address = |
| MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::SentryAdminCheck, req, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, result); |
| RETURN_IF_ERROR(rpc_status.status); |
| return Status::OK(); |
| } |
| |
| Status CatalogOpExecutor::UpdateTableUsage(const TUpdateTableUsageRequest& req, |
| TUpdateTableUsageResponse* resp) { |
| const TNetworkAddress& address = |
| MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port); |
| int attempt = 0; // Used for debug action only. |
| CatalogServiceConnection::RpcStatus rpc_status = |
| CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(), address, |
| &CatalogServiceClientWrapper::UpdateTableUsage, req, |
| FLAGS_catalog_client_connection_num_retries, |
| FLAGS_catalog_client_rpc_retry_interval_ms, |
| [&attempt]() { return CatalogRpcDebugFn(&attempt); }, resp); |
| RETURN_IF_ERROR(rpc_status.status); |
| return Status::OK(); |
| } |