blob: 64ab950a649d639cbb724fc057de1f2b807b32ce [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/client-request-state.h"
#include <boost/algorithm/string/predicate.hpp>
#include <limits>
#include <gutil/strings/substitute.h>
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/exec-env.h"
#include "scheduling/admission-controller.h"
#include "scheduling/scheduler.h"
#include "service/frontend.h"
#include "service/impala-server.h"
#include "service/query-options.h"
#include "service/query-result-set.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
#include "util/runtime-profile-counters.h"
#include "util/time.h"
#include "gen-cpp/CatalogService.h"
#include "gen-cpp/CatalogService_types.h"
#include <thrift/Thrift.h>
#include "common/names.h"
using boost::algorithm::iequals;
using boost::algorithm::join;
using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
using namespace beeswax;
using namespace strings;
DECLARE_int32(catalog_service_port);
DECLARE_string(catalog_service_host);
DECLARE_int64(max_result_cache_size);
namespace impala {
// Keys into the info string map of the runtime profile referring to specific
// items used by CM for monitoring purposes.
static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids";
ClientRequestState::ClientRequestState(
const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session)
: query_ctx_(query_ctx),
last_active_time_ms_(numeric_limits<int64_t>::max()),
ref_count_(0L),
child_query_executor_(new ChildQueryExecutor),
exec_env_(exec_env),
is_block_on_wait_joining_(false),
session_(session),
schedule_(NULL),
coord_(NULL),
result_cache_max_size_(-1),
// Profile is assigned name w/ id after planning
profile_(RuntimeProfile::Create(&profile_pool_, "Query")),
server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer")),
summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")),
is_cancelled_(false),
eos_(false),
query_state_(beeswax::QueryState::CREATED),
user_has_profile_access_(true),
current_batch_(NULL),
current_batch_row_(0),
num_rows_fetched_(0),
fetched_rows_(false),
frontend_(frontend),
parent_server_(server),
start_time_us_(UnixMicros()),
end_time_us_(0LL) {
#ifndef NDEBUG
profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
"DEBUG build of Impala. Use RELEASE builds to measure query performance.");
#endif
row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer");
client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer");
query_events_ = summary_profile_->AddEventSequence("Query Timeline");
query_events_->Start();
profile_->AddChild(summary_profile_);
profile_->set_name("Query (id=" + PrintId(query_id()) + ")");
summary_profile_->AddInfoString("Session ID", PrintId(session_id()));
summary_profile_->AddInfoString("Session Type", PrintTSessionType(session_type()));
if (session_type() == TSessionType::HIVESERVER2) {
summary_profile_->AddInfoString("HiveServer2 Protocol Version",
Substitute("V$0", 1 + session->hs2_version));
}
// Certain API clients expect Start Time and End Time to be date-time strings
// of nanosecond precision, so we explicitly specify the precision here.
summary_profile_->AddInfoString("Start Time", ToStringFromUnixMicros(start_time_us(),
TimePrecision::Nanosecond));
summary_profile_->AddInfoString("End Time", "");
summary_profile_->AddInfoString("Query Type", "N/A");
summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
summary_profile_->AddInfoString("Query Status", "OK");
summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true));
summary_profile_->AddInfoString("User", effective_user());
summary_profile_->AddInfoString("Connected User", connected_user());
summary_profile_->AddInfoString("Delegated User", do_as_user());
summary_profile_->AddInfoString("Network Address",
lexical_cast<string>(session_->network_address));
summary_profile_->AddInfoString("Default Db", default_db());
summary_profile_->AddInfoStringRedacted(
"Sql Statement", query_ctx_.client_request.stmt);
summary_profile_->AddInfoString("Coordinator",
TNetworkAddressToString(exec_env->backend_address()));
}
ClientRequestState::~ClientRequestState() {
DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!";
}
Status ClientRequestState::SetResultCache(QueryResultSet* cache,
int64_t max_size) {
lock_guard<mutex> l(lock_);
DCHECK(result_cache_ == NULL);
result_cache_.reset(cache);
if (max_size > FLAGS_max_result_cache_size) {
return Status(
Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.",
max_size, FLAGS_max_result_cache_size));
}
result_cache_max_size_ = max_size;
return Status::OK();
}
Status ClientRequestState::Exec(TExecRequest* exec_request) {
MarkActive();
exec_request_ = *exec_request;
profile_->AddChild(server_profile_);
summary_profile_->AddInfoString("Query Type", PrintTStmtType(stmt_type()));
summary_profile_->AddInfoString("Query Options (set by configuration)",
DebugQueryOptions(query_ctx_.client_request.query_options));
summary_profile_->AddInfoString("Query Options (set by configuration and planner)",
DebugQueryOptions(exec_request_.query_options));
switch (exec_request->stmt_type) {
case TStmtType::QUERY:
case TStmtType::DML:
DCHECK(exec_request_.__isset.query_exec_request);
return ExecQueryOrDmlRequest(exec_request_.query_exec_request);
case TStmtType::EXPLAIN: {
request_result_set_.reset(new vector<TResultRow>(
exec_request_.explain_result.results));
return Status::OK();
}
case TStmtType::DDL: {
DCHECK(exec_request_.__isset.catalog_op_request);
return ExecDdlRequest();
}
case TStmtType::LOAD: {
DCHECK(exec_request_.__isset.load_data_request);
TLoadDataResp response;
RETURN_IF_ERROR(
frontend_->LoadData(exec_request_.load_data_request, &response));
request_result_set_.reset(new vector<TResultRow>);
request_result_set_->push_back(response.load_summary);
// Now refresh the table metadata.
TCatalogOpRequest reset_req;
reset_req.__set_sync_ddl(exec_request_.query_options.sync_ddl);
reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
reset_req.__set_reset_metadata_params(TResetMetadataRequest());
reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
reset_req.reset_metadata_params.__set_is_refresh(true);
reset_req.reset_metadata_params.__set_table_name(
exec_request_.load_data_request.table_name);
reset_req.reset_metadata_params.__set_sync_ddl(
exec_request_.query_options.sync_ddl);
catalog_op_executor_.reset(
new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
*catalog_op_executor_->update_catalog_result(),
exec_request_.query_options.sync_ddl));
return Status::OK();
}
case TStmtType::SET: {
DCHECK(exec_request_.__isset.set_query_option_request);
lock_guard<mutex> l(session_->lock);
if (exec_request_.set_query_option_request.__isset.key) {
// "SET key=value" updates the session query options.
DCHECK(exec_request_.set_query_option_request.__isset.value);
const auto& key = exec_request_.set_query_option_request.key;
const auto& value = exec_request_.set_query_option_request.value;
RETURN_IF_ERROR(SetQueryOption(key, value, &session_->set_query_options,
&session_->set_query_options_mask));
SetResultSet({}, {}, {});
if (iequals(key, "idle_session_timeout")) {
// IMPALA-2248: Session timeout is set as a query option
session_->last_accessed_ms = UnixMillis(); // do not expire session immediately
session_->UpdateTimeout();
VLOG_QUERY << "ClientRequestState::Exec() SET: idle_session_timeout="
<< PrettyPrinter::Print(session_->session_timeout, TUnit::TIME_S);
}
} else {
// "SET" or "SET ALL"
bool is_set_all = exec_request_.set_query_option_request.__isset.is_set_all &&
exec_request_.set_query_option_request.is_set_all;
PopulateResultForSet(is_set_all);
}
return Status::OK();
}
default:
stringstream errmsg;
errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
return Status(errmsg.str());
}
}
void ClientRequestState::PopulateResultForSet(bool is_set_all) {
map<string, string> config;
TQueryOptionsToMap(session_->QueryOptions(), &config);
vector<string> keys, values, levels;
map<string, string>::const_iterator itr = config.begin();
for (; itr != config.end(); ++itr) {
const auto opt_level_id =
parent_server_->query_option_levels_[itr->first];
if (opt_level_id == TQueryOptionLevel::REMOVED) continue;
if (!is_set_all && (opt_level_id == TQueryOptionLevel::DEVELOPMENT ||
opt_level_id == TQueryOptionLevel::DEPRECATED)) {
continue;
}
keys.push_back(itr->first);
values.push_back(itr->second);
const auto opt_level = _TQueryOptionLevel_VALUES_TO_NAMES.find(opt_level_id);
DCHECK(opt_level !=_TQueryOptionLevel_VALUES_TO_NAMES.end());
levels.push_back(opt_level->second);
}
SetResultSet(keys, values, levels);
}
Status ClientRequestState::ExecLocalCatalogOp(
const TCatalogOpRequest& catalog_op) {
switch (catalog_op.op_type) {
case TCatalogOpType::USE: {
lock_guard<mutex> l(session_->lock);
session_->database = exec_request_.catalog_op_request.use_db_params.db;
return Status::OK();
}
case TCatalogOpType::SHOW_TABLES: {
const TShowTablesParams* params = &catalog_op.show_tables_params;
// A NULL pattern means match all tables. However, Thrift string types can't
// be NULL in C++, so we have to test if it's set rather than just blindly
// using the value.
const string* table_name =
params->__isset.show_pattern ? &(params->show_pattern) : NULL;
TGetTablesResult table_names;
RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name,
&query_ctx_.session, &table_names));
SetResultSet(table_names.tables);
return Status::OK();
}
case TCatalogOpType::SHOW_DBS: {
const TShowDbsParams* params = &catalog_op.show_dbs_params;
TGetDbsResult dbs;
const string* db_pattern =
params->__isset.show_pattern ? (&params->show_pattern) : NULL;
RETURN_IF_ERROR(
frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs));
vector<string> names, comments;
names.reserve(dbs.dbs.size());
comments.reserve(dbs.dbs.size());
for (const TDatabase& db: dbs.dbs) {
names.push_back(db.db_name);
comments.push_back(db.metastore_db.description);
}
SetResultSet(names, comments);
return Status::OK();
}
case TCatalogOpType::SHOW_DATA_SRCS: {
const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params;
TGetDataSrcsResult result;
const string* pattern =
params->__isset.show_pattern ? (&params->show_pattern) : NULL;
RETURN_IF_ERROR(
frontend_->GetDataSrcMetadata(pattern, &result));
SetResultSet(result.data_src_names, result.locations, result.class_names,
result.api_versions);
return Status::OK();
}
case TCatalogOpType::SHOW_STATS: {
const TShowStatsParams& params = catalog_op.show_stats_params;
TResultSet response;
RETURN_IF_ERROR(frontend_->GetStats(params, &response));
// Set the result set and its schema from the response.
request_result_set_.reset(new vector<TResultRow>(response.rows));
result_metadata_ = response.schema;
return Status::OK();
}
case TCatalogOpType::SHOW_FUNCTIONS: {
const TShowFunctionsParams* params = &catalog_op.show_fns_params;
TGetFunctionsResult functions;
const string* fn_pattern =
params->__isset.show_pattern ? (&params->show_pattern) : NULL;
RETURN_IF_ERROR(frontend_->GetFunctions(
params->category, params->db, fn_pattern, &query_ctx_.session, &functions));
SetResultSet(functions.fn_ret_types, functions.fn_signatures,
functions.fn_binary_types, functions.fn_persistence);
return Status::OK();
}
case TCatalogOpType::SHOW_ROLES: {
const TShowRolesParams& params = catalog_op.show_roles_params;
if (params.is_admin_op) {
// Verify the user has privileges to perform this operation by checking against
// the Sentry Service (via the Catalog Server).
catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
server_profile_));
TSentryAdminCheckRequest req;
req.__set_header(TCatalogServiceRequestHeader());
req.header.__set_requesting_user(effective_user());
RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
}
// If we have made it here, the user has privileges to execute this operation.
// Return the results.
TShowRolesResult result;
RETURN_IF_ERROR(frontend_->ShowRoles(params, &result));
SetResultSet(result.role_names);
return Status::OK();
}
case TCatalogOpType::SHOW_GRANT_ROLE: {
const TShowGrantRoleParams& params = catalog_op.show_grant_role_params;
if (params.is_admin_op) {
// Verify the user has privileges to perform this operation by checking against
// the Sentry Service (via the Catalog Server).
catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
server_profile_));
TSentryAdminCheckRequest req;
req.__set_header(TCatalogServiceRequestHeader());
req.header.__set_requesting_user(effective_user());
RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
}
TResultSet response;
RETURN_IF_ERROR(frontend_->GetRolePrivileges(params, &response));
// Set the result set and its schema from the response.
request_result_set_.reset(new vector<TResultRow>(response.rows));
result_metadata_ = response.schema;
return Status::OK();
}
case TCatalogOpType::DESCRIBE_DB: {
TDescribeResult response;
RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params,
&response));
// Set the result set
request_result_set_.reset(new vector<TResultRow>(response.results));
return Status::OK();
}
case TCatalogOpType::DESCRIBE_TABLE: {
TDescribeResult response;
RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params,
&response));
// Set the result set
request_result_set_.reset(new vector<TResultRow>(response.results));
return Status::OK();
}
case TCatalogOpType::SHOW_CREATE_TABLE: {
string response;
RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params,
&response));
SetResultSet(vector<string>(1, response));
return Status::OK();
}
case TCatalogOpType::SHOW_CREATE_FUNCTION: {
string response;
RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params,
&response));
SetResultSet(vector<string>(1, response));
return Status::OK();
}
case TCatalogOpType::SHOW_FILES: {
TResultSet response;
RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response));
// Set the result set and its schema from the response.
request_result_set_.reset(new vector<TResultRow>(response.rows));
result_metadata_ = response.schema;
return Status::OK();
}
default: {
stringstream ss;
ss << "Unexpected TCatalogOpType: " << catalog_op.op_type;
return Status(ss.str());
}
}
}
Status ClientRequestState::ExecQueryOrDmlRequest(
const TQueryExecRequest& query_exec_request) {
// we always need at least one plan fragment
DCHECK(query_exec_request.plan_exec_info.size() > 0);
if (query_exec_request.__isset.query_plan) {
stringstream plan_ss;
// Add some delimiters to make it clearer where the plan
// begins and the profile ends
plan_ss << "\n----------------\n"
<< query_exec_request.query_plan
<< "----------------";
summary_profile_->AddInfoStringRedacted("Plan", plan_ss.str());
}
// Add info strings consumed by CM: Estimated mem and tables missing stats.
if (query_exec_request.__isset.per_host_mem_estimate) {
stringstream ss;
ss << query_exec_request.per_host_mem_estimate;
summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str());
}
if (!query_exec_request.query_ctx.__isset.parent_query_id &&
query_exec_request.query_ctx.__isset.tables_missing_stats &&
!query_exec_request.query_ctx.tables_missing_stats.empty()) {
stringstream ss;
const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats;
for (int i = 0; i < tbls.size(); ++i) {
if (i != 0) ss << ",";
ss << tbls[i].db_name << "." << tbls[i].table_name;
}
summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
}
if (!query_exec_request.query_ctx.__isset.parent_query_id &&
query_exec_request.query_ctx.__isset.tables_with_corrupt_stats &&
!query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) {
stringstream ss;
const vector<TTableName>& tbls =
query_exec_request.query_ctx.tables_with_corrupt_stats;
for (int i = 0; i < tbls.size(); ++i) {
if (i != 0) ss << ",";
ss << tbls[i].db_name << "." << tbls[i].table_name;
}
summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
}
if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
!query_exec_request.query_ctx.tables_missing_diskids.empty()) {
stringstream ss;
const vector<TTableName>& tbls =
query_exec_request.query_ctx.tables_missing_diskids;
for (int i = 0; i < tbls.size(); ++i) {
if (i != 0) ss << ",";
ss << tbls[i].db_name << "." << tbls[i].table_name;
}
summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
}
{
lock_guard<mutex> l(lock_);
// Don't start executing the query if Cancel() was called concurrently with Exec().
if (is_cancelled_) return Status::CANCELLED;
schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
exec_request_.query_options, summary_profile_, query_events_));
}
Status status = exec_env_->scheduler()->Schedule(schedule_.get());
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
if (exec_env_->admission_controller() != nullptr) {
status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
}
coord_.reset(new Coordinator(*schedule_, query_events_));
status = coord_->Exec();
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
profile_->AddChild(coord_->query_profile());
return Status::OK();
}
Status ClientRequestState::ExecDdlRequest() {
string op_type = catalog_op_type() == TCatalogOpType::DDL ?
PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
summary_profile_->AddInfoString("DDL Type", op_type);
if (catalog_op_type() != TCatalogOpType::DDL &&
catalog_op_type() != TCatalogOpType::RESET_METADATA) {
Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request);
lock_guard<mutex> l(lock_);
return UpdateQueryStatus(status);
}
if (ddl_type() == TDdlType::COMPUTE_STATS) {
TComputeStatsParams& compute_stats_params =
exec_request_.catalog_op_request.ddl_params.compute_stats_params;
// Add child queries for computing table and column stats.
vector<ChildQuery> child_queries;
if (compute_stats_params.__isset.tbl_stats_query) {
child_queries.push_back(
ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_));
}
if (compute_stats_params.__isset.col_stats_query) {
child_queries.push_back(
ChildQuery(compute_stats_params.col_stats_query, this, parent_server_));
}
if (child_queries.size() > 0) {
RETURN_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries)));
}
return Status::OK();
}
catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
server_profile_));
Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
// If this is a CTAS request, there will usually be more work to do
// after executing the CREATE TABLE statement (the INSERT portion of the operation).
// The exception is if the user specified IF NOT EXISTS and the table already
// existed, in which case we do not execute the INSERT.
if (catalog_op_type() == TCatalogOpType::DDL &&
ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
!catalog_op_executor_->ddl_exec_response()->new_table_created) {
DCHECK(exec_request_.catalog_op_request.
ddl_params.create_table_params.if_not_exists);
return Status::OK();
}
// Add newly created table to catalog cache.
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
*catalog_op_executor_->update_catalog_result(),
exec_request_.query_options.sync_ddl));
if (catalog_op_type() == TCatalogOpType::DDL &&
ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
// At this point, the remainder of the CTAS request executes
// like a normal DML request. As with other DML requests, it will
// wait for another catalog update if any partitions were altered as a result
// of the operation.
DCHECK(exec_request_.__isset.query_exec_request);
RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request));
}
// Set the results to be reported to the client.
SetResultSet(catalog_op_executor_->ddl_exec_response());
return Status::OK();
}
void ClientRequestState::Done() {
MarkActive();
// Make sure we join on wait_thread_ before we finish (and especially before this object
// is destroyed).
BlockOnWait();
// Update latest observed Kudu timestamp stored in the session from the coordinator.
// Needs to take the session_ lock which must not be taken while holding lock_, so this
// must happen before taking lock_ below.
if (coord_.get() != NULL) {
// This is safe to access on coord_ after Wait() has been called.
uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
if (latest_kudu_ts > 0) {
VLOG_RPC << "Updating session (id=" << session_id() << ") with latest "
<< "observed Kudu timestamp: " << latest_kudu_ts;
lock_guard<mutex> session_lock(session_->lock);
session_->kudu_latest_observed_ts = std::max<uint64_t>(
session_->kudu_latest_observed_ts, latest_kudu_ts);
}
}
unique_lock<mutex> l(lock_);
end_time_us_ = UnixMicros();
// Certain API clients expect Start Time and End Time to be date-time strings
// of nanosecond precision, so we explicitly specify the precision here.
summary_profile_->AddInfoString("End Time", ToStringFromUnixMicros(end_time_us(),
TimePrecision::Nanosecond));
query_events_->MarkEvent("Unregister query");
// Update result set cache metrics, and update mem limit accounting before tearing
// down the coordinator.
ClearResultCache();
}
Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
TResultSet metadata_op_result;
// Like the other Exec(), fill out as much profile information as we're able to.
summary_profile_->AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
&metadata_op_result));
result_metadata_ = metadata_op_result.schema;
request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
return Status::OK();
}
Status ClientRequestState::WaitAsync() {
return Thread::Create("query-exec-state", "wait-thread",
&ClientRequestState::Wait, this, &wait_thread_, true);
}
void ClientRequestState::BlockOnWait() {
unique_lock<mutex> l(lock_);
if (wait_thread_.get() == NULL) return;
if (!is_block_on_wait_joining_) {
// No other thread is already joining on wait_thread_, so this thread needs to do
// it. Other threads will need to block on the cond-var.
is_block_on_wait_joining_ = true;
l.unlock();
wait_thread_->Join();
l.lock();
is_block_on_wait_joining_ = false;
wait_thread_.reset();
block_on_wait_cv_.NotifyAll();
} else {
// Another thread is already joining with wait_thread_. Block on the cond-var
// until the Join() executed in the other thread has completed.
do {
block_on_wait_cv_.Wait(l);
} while (is_block_on_wait_joining_);
}
}
void ClientRequestState::Wait() {
// block until results are ready
Status status = WaitInternal();
{
lock_guard<mutex> l(lock_);
if (returns_result_set()) {
query_events()->MarkEvent("Rows available");
} else {
query_events()->MarkEvent("Request finished");
}
discard_result(UpdateQueryStatus(status));
}
if (status.ok()) {
UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
}
}
Status ClientRequestState::WaitInternal() {
// Explain requests have already populated the result set. Nothing to do here.
if (exec_request_.stmt_type == TStmtType::EXPLAIN) {
MarkInactive();
return Status::OK();
}
vector<ChildQuery*> child_queries;
Status child_queries_status = child_query_executor_->WaitForAll(&child_queries);
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(query_status_);
RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status));
}
if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished");
if (coord_.get() != NULL) {
RETURN_IF_ERROR(coord_->Wait());
RETURN_IF_ERROR(UpdateCatalog());
}
if (catalog_op_type() == TCatalogOpType::DDL &&
ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) {
RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries));
}
if (!returns_result_set()) {
// Queries that do not return a result are finished at this point. This includes
// DML operations and a subset of the DDL operations.
eos_ = true;
} else if (catalog_op_type() == TCatalogOpType::DDL &&
ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
SetCreateTableAsSelectResultSet();
}
// Rows are available now (for SELECT statement), so start the 'wait' timer that tracks
// how long Impala waits for the client to fetch rows. For other statements, track the
// time until a Close() is received.
MarkInactive();
return Status::OK();
}
Status ClientRequestState::FetchRows(const int32_t max_rows,
QueryResultSet* fetched_rows) {
// Pause the wait timer, since the client has instructed us to do work on its behalf.
MarkActive();
// ImpalaServer::FetchInternal has already taken our lock_
discard_result(UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows)));
MarkInactive();
return query_status_;
}
Status ClientRequestState::RestartFetch() {
// No result caching for this query. Restart is invalid.
if (result_cache_max_size_ <= 0) {
return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR,
"Restarting of fetch requires enabling of query result caching."));
}
// The cache overflowed on a previous fetch.
if (result_cache_.get() == NULL) {
stringstream ss;
ss << "The query result cache exceeded its limit of " << result_cache_max_size_
<< " rows. Restarting the fetch is not possible.";
return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str()));
}
// Reset fetch state to start over.
eos_ = false;
num_rows_fetched_ = 0;
return Status::OK();
}
void ClientRequestState::UpdateNonErrorQueryState(
beeswax::QueryState::type query_state) {
lock_guard<mutex> l(lock_);
DCHECK(query_state != beeswax::QueryState::EXCEPTION);
if (query_state_ < query_state) UpdateQueryState(query_state);
}
Status ClientRequestState::UpdateQueryStatus(const Status& status) {
// Preserve the first non-ok status
if (!status.ok() && query_status_.ok()) {
UpdateQueryState(beeswax::QueryState::EXCEPTION);
query_status_ = status;
summary_profile_->AddInfoStringRedacted("Query Status", query_status_.GetDetail());
}
return status;
}
Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
QueryResultSet* fetched_rows) {
DCHECK(query_state_ != beeswax::QueryState::EXCEPTION);
if (eos_) return Status::OK();
if (request_result_set_ != NULL) {
UpdateQueryState(beeswax::QueryState::FINISHED);
int num_rows = 0;
const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
// max_rows <= 0 means no limit
while ((num_rows < max_rows || max_rows <= 0)
&& num_rows_fetched_ < all_rows.size()) {
RETURN_IF_ERROR(fetched_rows->AddOneRow(all_rows[num_rows_fetched_]));
++num_rows_fetched_;
++num_rows;
}
eos_ = (num_rows_fetched_ == all_rows.size());
return Status::OK();
}
if (coord_.get() == nullptr) {
return Status("Client tried to fetch rows on a query that produces no results.");
}
int32_t num_rows_fetched_from_cache = 0;
if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
// Satisfy the fetch from the result cache if possible.
int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows;
num_rows_fetched_from_cache =
fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size);
num_rows_fetched_ += num_rows_fetched_from_cache;
if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
}
// results will be ready after this call
UpdateQueryState(beeswax::QueryState::FINISHED);
// Maximum number of rows to be fetched from the coord.
int32_t max_coord_rows = max_rows;
if (max_rows > 0) {
DCHECK_LE(num_rows_fetched_from_cache, max_rows);
max_coord_rows = max_rows - num_rows_fetched_from_cache;
}
{
SCOPED_TIMER(row_materialization_timer_);
size_t before = fetched_rows->size();
// Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
// (already held) ensures that we do not call coord_->GetNext() multiple times
// concurrently.
// TODO: Simplify this.
lock_.unlock();
Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_);
lock_.lock();
int num_fetched = fetched_rows->size() - before;
DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
"Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows);
num_rows_fetched_ += num_fetched;
RETURN_IF_ERROR(status);
// Check if query status has changed during GetNext() call
if (!query_status_.ok()) {
eos_ = true;
return query_status_;
}
}
// Update the result cache if necessary.
if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache;
if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) {
// Set the cache to NULL to indicate that adding the rows fetched from the coord
// would exceed the bound of the cache, and therefore, RestartFetch() should fail.
ClearResultCache();
return Status::OK();
}
// We guess the size of the cache after adding fetched_rows by looking at the size of
// fetched_rows itself, and using this estimate to confirm that the memtracker will
// allow us to use this much extra memory. In fact, this might be an overestimate, as
// the size of two result sets combined into one is not always the size of both result
// sets added together (the best example is the null bitset for each column: it might
// have only one entry in each result set, and as a result consume two bytes, but when
// the result sets are combined, only one byte is needed). Therefore after we add the
// new result set into the cache, we need to fix up the memory consumption to the
// actual levels to ensure we don't 'leak' bytes that we aren't using.
int64_t before = result_cache_->ByteSize();
// Upper-bound on memory required to add fetched_rows to the cache.
int64_t delta_bytes =
fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size());
MemTracker* query_mem_tracker = coord_->query_mem_tracker();
// Count the cached rows towards the mem limit.
if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) {
string details("Failed to allocate memory for result cache.");
return query_mem_tracker->MemLimitExceeded(nullptr, details, delta_bytes);
}
// Append all rows fetched from the coordinator into the cache.
int num_rows_added = result_cache_->AddRows(
fetched_rows, num_rows_fetched_from_cache, fetched_rows->size());
int64_t after = result_cache_->ByteSize();
// Confirm that this was not an underestimate of the memory required.
DCHECK_GE(before + delta_bytes, after)
<< "Combined result sets consume more memory than both individually "
<< Substitute("(before: $0, delta_bytes: $1, after: $2)",
before, delta_bytes, after);
// Fix up the tracked values
if (before + delta_bytes > after) {
query_mem_tracker->Release(before + delta_bytes - after);
delta_bytes = after - before;
}
// Update result set cache metrics.
ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added);
ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes);
}
return Status::OK();
}
Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
if (check_inflight) {
// If the query is in 'inflight_queries' it means that the query has actually started
// executing. It is ok if the query is removed from 'inflight_queries' during
// cancellation, so we can release the session lock before starting the cancellation
// work.
lock_guard<mutex> session_lock(session_->lock);
if (session_->inflight_queries.find(query_id()) == session_->inflight_queries.end()) {
return Status("Query not yet running");
}
}
Coordinator* coord;
{
lock_guard<mutex> lock(lock_);
// If the query is completed or cancelled, no need to update state.
bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION;
if (!already_done && cause != NULL) {
DCHECK(!cause->ok());
discard_result(UpdateQueryStatus(*cause));
query_events_->MarkEvent("Cancelled");
DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION);
}
// Get a copy of the coordinator pointer while holding 'lock_'.
coord = coord_.get();
is_cancelled_ = true;
} // Release lock_ before doing cancellation work.
// Cancel and close child queries before cancelling parent. 'lock_' should not be held
// because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation
// involves RPCs and can take quite some time.
child_query_executor_->Cancel();
// Cancel the parent query. 'lock_' should not be held because cancellation involves
// RPCs and can block for a long time.
if (coord != NULL) coord->Cancel(cause);
return Status::OK();
}
Status ClientRequestState::UpdateCatalog() {
if (!exec_request().__isset.query_exec_request ||
exec_request().query_exec_request.stmt_type != TStmtType::DML) {
return Status::OK();
}
query_events_->MarkEvent("DML data written");
SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
TQueryExecRequest query_exec_request = exec_request().query_exec_request;
if (query_exec_request.__isset.finalize_params) {
const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
TUpdateCatalogRequest catalog_update;
catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
catalog_update.__set_header(TCatalogServiceRequestHeader());
catalog_update.header.__set_requesting_user(effective_user());
if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
<< query_id() << ")";
} else {
// TODO: We track partitions written to, not created, which means
// that we do more work than is necessary, because written-to
// partitions don't always require a metastore change.
VLOG_QUERY << "Updating metastore with " << catalog_update.created_partitions.size()
<< " altered partitions ("
<< join (catalog_update.created_partitions, ", ") << ")";
catalog_update.target_table = finalize_params.table_name;
catalog_update.db_name = finalize_params.table_db;
Status cnxn_status;
const TNetworkAddress& address =
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
CatalogServiceConnection client(
exec_env_->catalogd_client_cache(), address, &cnxn_status);
RETURN_IF_ERROR(cnxn_status);
VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
TUpdateCatalogResponse resp;
RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::UpdateCatalog,
catalog_update, &resp));
Status status(resp.result.status);
if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
exec_request_.query_options.sync_ddl));
}
}
query_events_->MarkEvent("DML Metastore update finished");
return Status::OK();
}
void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) {
if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
result_metadata_ = ddl_resp->result_set.schema;
request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows));
}
}
void ClientRequestState::SetResultSet(const vector<string>& results) {
request_result_set_.reset(new vector<TResultRow>);
request_result_set_->resize(results.size());
for (int i = 0; i < results.size(); ++i) {
(*request_result_set_.get())[i].__isset.colVals = true;
(*request_result_set_.get())[i].colVals.resize(1);
(*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]);
}
}
void ClientRequestState::SetResultSet(const vector<string>& col1,
const vector<string>& col2) {
DCHECK_EQ(col1.size(), col2.size());
request_result_set_.reset(new vector<TResultRow>);
request_result_set_->resize(col1.size());
for (int i = 0; i < col1.size(); ++i) {
(*request_result_set_.get())[i].__isset.colVals = true;
(*request_result_set_.get())[i].colVals.resize(2);
(*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
(*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
}
}
void ClientRequestState::SetResultSet(const vector<string>& col1,
const vector<string>& col2, const vector<string>& col3) {
DCHECK_EQ(col1.size(), col2.size());
DCHECK_EQ(col1.size(), col3.size());
request_result_set_.reset(new vector<TResultRow>);
request_result_set_->resize(col1.size());
for (int i = 0; i < col1.size(); ++i) {
(*request_result_set_.get())[i].__isset.colVals = true;
(*request_result_set_.get())[i].colVals.resize(3);
(*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
(*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
(*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
}
}
void ClientRequestState::SetResultSet(const vector<string>& col1,
const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) {
DCHECK_EQ(col1.size(), col2.size());
DCHECK_EQ(col1.size(), col3.size());
DCHECK_EQ(col1.size(), col4.size());
request_result_set_.reset(new vector<TResultRow>);
request_result_set_->resize(col1.size());
for (int i = 0; i < col1.size(); ++i) {
(*request_result_set_.get())[i].__isset.colVals = true;
(*request_result_set_.get())[i].colVals.resize(4);
(*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
(*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
(*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
(*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]);
}
}
void ClientRequestState::SetCreateTableAsSelectResultSet() {
DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
int64_t total_num_rows_inserted = 0;
// There will only be rows inserted in the case a new table was created as part of this
// operation.
if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
DCHECK(coord_.get());
for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
total_num_rows_inserted += p.second.num_modified_rows;
}
}
const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
VLOG_QUERY << summary_msg;
vector<string> results(1, summary_msg);
SetResultSet(results);
}
void ClientRequestState::MarkInactive() {
client_wait_sw_.Start();
lock_guard<mutex> l(expiration_data_lock_);
last_active_time_ms_ = UnixMillis();
DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
--ref_count_;
}
void ClientRequestState::MarkActive() {
client_wait_sw_.Stop();
int64_t elapsed_time = client_wait_sw_.ElapsedTime();
client_wait_timer_->Set(elapsed_time);
lock_guard<mutex> l(expiration_data_lock_);
last_active_time_ms_ = UnixMillis();
++ref_count_;
}
Status ClientRequestState::UpdateTableAndColumnStats(
const vector<ChildQuery*>& child_queries) {
DCHECK_GE(child_queries.size(), 1);
DCHECK_LE(child_queries.size(), 2);
catalog_op_executor_.reset(
new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
// If there was no column stats query, pass in empty thrift structures to
// ExecComputeStats(). Otherwise pass in the column stats result.
TTableSchema col_stats_schema;
TRowSet col_stats_data;
if (child_queries.size() > 1) {
col_stats_schema = child_queries[1]->result_schema();
col_stats_data = child_queries[1]->result_data();
}
Status status = catalog_op_executor_->ExecComputeStats(
exec_request_.catalog_op_request,
child_queries[0]->result_schema(),
child_queries[0]->result_data(),
col_stats_schema,
col_stats_data);
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
*catalog_op_executor_->update_catalog_result(),
exec_request_.query_options.sync_ddl));
// Set the results to be reported to the client.
SetResultSet(catalog_op_executor_->ddl_exec_response());
query_events_->MarkEvent("Metastore update finished");
return Status::OK();
}
void ClientRequestState::ClearResultCache() {
if (result_cache_ == NULL) return;
// Update result set cache metrics and mem limit accounting.
ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size());
int64_t total_bytes = result_cache_->ByteSize();
ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes);
if (coord_ != NULL) {
DCHECK(coord_->query_mem_tracker() != NULL);
coord_->query_mem_tracker()->Release(total_bytes);
}
result_cache_.reset(NULL);
}
void ClientRequestState::UpdateQueryState(
beeswax::QueryState::type query_state) {
query_state_ = query_state;
summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
}
}