blob: 2c77c70c467a30eb047fa2f4d80a503a8c14d106 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "service/workload-management-worker.h"
#include <chrono>
#include <condition_variable>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <utility>
#include <boost/algorithm/string/join.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gutil/strings/strcat.h>
#include <gutil/strings/substitute.h>
#include "common/compiler-util.h"
#include "common/logging.h"
#include "common/status.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/ImpalaService_types.h"
#include "gen-cpp/Query_types.h"
#include "gen-cpp/SystemTables_types.h"
#include "gen-cpp/Types_types.h"
#include "kudu/util/version_util.h"
#include "runtime/exec-env.h"
#include "runtime/query-driver.h"
#include "service/client-request-state.h"
#include "service/impala-server.h"
#include "service/internal-server.h"
#include "service/query-options.h"
#include "service/query-state-record.h"
#include "util/debug-util.h"
#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
#include "util/network-util.h"
#include "util/pretty-printer.h"
#include "util/sql-util.h"
#include "util/stopwatch.h"
#include "util/string-util.h"
#include "util/ticker.h"
#include "workload_mgmt/workload-management.h"
using namespace impala;
using namespace impala::workloadmgmt;
using namespace std;
using kudu::Version;
DECLARE_bool(enable_workload_mgmt);
DECLARE_int32(query_log_write_interval_s);
DECLARE_int32(query_log_dml_exec_timeout_s);
DECLARE_int32(query_log_max_insert_attempts);
DECLARE_int32(query_log_max_queued);
DECLARE_int32(query_log_shutdown_timeout_s);
DECLARE_string(debug_actions);
DECLARE_string(cluster_id);
DECLARE_string(query_log_request_pool);
DECLARE_int32(query_log_write_timeout_s);
DECLARE_string(workload_mgmt_user);
DECLARE_int32(query_log_expression_limit);
namespace impala {
namespace workloadmgmt {
/// Constant declaring how to convert from micro and nano seconds to milliseconds.
static constexpr double MICROS_TO_MILLIS = 1000;
static constexpr double NANOS_TO_MILLIS = 1000000;
/// SQL column type for duration columns that store a millisecond value.
static const string MILLIS_DECIMAL_TYPE = strings::Substitute(
"DECIMAL($0,$1)", DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE);
/// Helper function to add a decimal value to the stream that is building the completed
/// queries insert DML.
///
/// Parameters:
/// `ctx` The field parse context object.
/// `val` A value to write in the completed queries sql stream.
/// `factor` The provided val input will be divided by this value.
static void _write_decimal(FieldParserContext& ctx, int64_t val, double factor) {
ctx.sql << "CAST(" << val / factor << " AS " << MILLIS_DECIMAL_TYPE << ")";
}
/// Helper function to add the timestamp for a single event from the events timeline into
/// the stream that is building the completed queries insert DML.
///
/// Parameters:
/// `ctx` The field parse context object.
/// `target_event` The element from the QueryEvent enum that represents the event being
/// inserted into the sql statement. The corresponding event value will
/// be retrieved from the map of query events.
static void _write_event(FieldParserContext& ctx, QueryEvent target_event) {
const auto& event = ctx.record->events.find(target_event);
DCHECK(event != ctx.record->events.end());
_write_decimal(ctx, event->second, NANOS_TO_MILLIS);
}
const std::array<FieldParser, NumQueryTableColumns> FIELD_PARSERS = {{
// Schema Version 1.0.0 Columns
// cluster id
{[](FieldParserContext& ctx) { ctx.sql << "'" << ctx.cluster_id << "'"; }},
// query id
{[](FieldParserContext& ctx) {
ctx.sql << "'" << PrintId(ctx.record->base_state->id) << "'";
}},
// session_id
{[](FieldParserContext& ctx) {
ctx.sql << "'" << PrintId(ctx.record->session_id) << "'";
}},
// session type
{[](FieldParserContext& ctx) { ctx.sql << "'" << ctx.record->session_type << "'"; }},
// hiveserver2 protocol version
{[](FieldParserContext& ctx) {
ctx.sql << "'";
if (ctx.record->session_type == TSessionType::HIVESERVER2) {
ctx.sql << ctx.record->hiveserver2_protocol_version_formatted();
}
ctx.sql << "'";
}},
// db user
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->base_state->effective_user << "'";
}},
// db user connection
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->db_user_connection << "'";
}},
// db name
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->base_state->default_db << "'";
}},
// impala coordinator
{[](FieldParserContext& ctx) {
ctx.sql << "'"
<< TNetworkAddressToString(
ExecEnv::GetInstance()->configured_backend_address())
<< "'";
}},
// query status
{[](FieldParserContext& ctx) {
ctx.sql << "'";
if (ctx.record->base_state->query_status.ok()) {
ctx.sql << "OK";
} else {
ctx.sql << EscapeSql(ctx.record->base_state->query_status.msg().msg());
}
ctx.sql << "'";
}},
// query state
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->base_state->query_state << "'";
}},
// impala query end state
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->impala_query_end_state << "'";
}},
// query type
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->base_state->stmt_type << "'";
}},
// network address
{[](FieldParserContext& ctx) {
ctx.sql << "'" << TNetworkAddressToString(ctx.record->client_address) << "'";
}},
// start time utc
{[](FieldParserContext& ctx) {
ctx.sql << "UNIX_MICROS_TO_UTC_TIMESTAMP(" << ctx.record->base_state->start_time_us
<< ")";
}},
// total time ms
{[](FieldParserContext& ctx) {
_write_decimal(ctx,
(ctx.record->base_state->end_time_us - ctx.record->base_state->start_time_us),
MICROS_TO_MILLIS);
}},
// query opts config
{[](FieldParserContext& ctx) {
const string opts_str = DebugQueryOptions(ctx.record->query_options);
ctx.sql << "'" << EscapeSql(opts_str) << "'";
}},
// resource pool
{[](FieldParserContext& ctx) {
ctx.sql << "'" << EscapeSql(ctx.record->base_state->resource_pool) << "'";
}},
// per host mem estimate
{[](FieldParserContext& ctx) { ctx.sql << ctx.record->per_host_mem_estimate; }},
// dedicated coord mem estimate
{[](FieldParserContext& ctx) {
ctx.sql << ctx.record->dedicated_coord_mem_estimate;
}},
// per host fragment instances
{[](FieldParserContext& ctx) {
ctx.sql << "'";
if (!ctx.record->per_host_state.empty()) {
for (const auto& iter : ctx.record->per_host_state) {
ctx.sql << TNetworkAddressToString(iter.first) << "="
<< iter.second.fragment_instance_count << ',';
}
ctx.sql.move_back();
}
ctx.sql << "'";
}},
// backends count
{[](FieldParserContext& ctx) {
if (ctx.record->per_host_state.empty()) {
ctx.sql << 0;
} else {
ctx.sql << ctx.record->per_host_state.size();
}
}},
// admission result
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->admission_result << "'";
}},
// cluster memory admitted
{[](FieldParserContext& ctx) { ctx.sql << ctx.record->base_state->cluster_mem_est; }},
// executor group
{[](FieldParserContext& ctx) {
ctx.sql << "'" << ctx.record->executor_group << "'";
}},
// executor groups
{[](FieldParserContext& ctx) {
ctx.sql << "'" << EscapeSql(ctx.record->executor_groups) << "'";
}},
// exec summary
{[](FieldParserContext& ctx) {
ctx.sql << "'" << EscapeSql(ctx.record->exec_summary) << "'";
}},
// num rows fetched
{[](FieldParserContext& ctx) {
ctx.sql << ctx.record->base_state->num_rows_fetched;
}},
// row materialization rows per sec
{[](FieldParserContext& ctx) { ctx.sql << ctx.record->row_materialization_rate; }},
// row materialization time ms
{[](FieldParserContext& ctx) {
_write_decimal(ctx, ctx.record->row_materialization_time, NANOS_TO_MILLIS);
}},
// compressed bytes spilled
{[](FieldParserContext& ctx) { ctx.sql << ctx.record->compressed_bytes_spilled; }},
// event planning finished
{[](FieldParserContext& ctx) { _write_event(ctx, PLANNING_FINISHED); }},
// event submit for admission
{[](FieldParserContext& ctx) { _write_event(ctx, SUBMIT_FOR_ADMISSION); }},
// event completed admission
{[](FieldParserContext& ctx) { _write_event(ctx, COMPLETED_ADMISSION); }},
// event all backends started
{[](FieldParserContext& ctx) { _write_event(ctx, ALL_BACKENDS_STARTED); }},
// event rows available
{[](FieldParserContext& ctx) { _write_event(ctx, ROWS_AVAILABLE); }},
// event first row fetched
{[](FieldParserContext& ctx) { _write_event(ctx, FIRST_ROW_FETCHED); }},
// event last row fetched
{[](FieldParserContext& ctx) { _write_event(ctx, LAST_ROW_FETCHED); }},
// event unregister query
{[](FieldParserContext& ctx) { _write_event(ctx, UNREGISTER_QUERY); }},
// read io wait total ms
{[](FieldParserContext& ctx) {
_write_decimal(ctx, ctx.record->read_io_wait_time_total, NANOS_TO_MILLIS);
}},
// read io wait mean ms
{[](FieldParserContext& ctx) {
_write_decimal(ctx, ctx.record->read_io_wait_time_mean, NANOS_TO_MILLIS);
}},
// bytes read cache total
{[](FieldParserContext& ctx) { ctx.sql << ctx.record->bytes_read_cache_total; }},
// bytes read total
{[](FieldParserContext& ctx) { ctx.sql << ctx.record->bytes_read_total; }},
// pernode peak mem min
{[](FieldParserContext& ctx) {
auto min_elem = min_element(ctx.record->per_host_state.cbegin(),
ctx.record->per_host_state.cend(), PerHostPeakMemoryComparator);
if (LIKELY(min_elem != ctx.record->per_host_state.cend())) {
ctx.sql << min_elem->second.peak_memory_usage;
} else {
ctx.sql << 0;
}
}},
// pernode peak mem max
{[](FieldParserContext& ctx) {
auto max_elem = max_element(ctx.record->per_host_state.cbegin(),
ctx.record->per_host_state.cend(), PerHostPeakMemoryComparator);
if (UNLIKELY(max_elem == ctx.record->per_host_state.cend())) {
ctx.sql << 0;
} else {
ctx.sql << max_elem->second.peak_memory_usage;
}
}},
// pernode peak mem mean
{[](FieldParserContext& ctx) {
int64_t calc_mean = 0;
if (LIKELY(!ctx.record->per_host_state.empty())) {
for (const auto& host : ctx.record->per_host_state) {
calc_mean += host.second.peak_memory_usage;
}
calc_mean = calc_mean / ctx.record->per_host_state.size();
}
ctx.sql << calc_mean;
}},
// sql
{[](FieldParserContext& ctx) {
ctx.sql << "'"
<< EscapeSql(ctx.record->redacted_sql, FLAGS_query_log_max_sql_length)
<< "'";
}},
// plan
{[](FieldParserContext& ctx) {
ctx.sql << "'"
<< EscapeSql(ctx.record->base_state->plan, FLAGS_query_log_max_plan_length)
<< "'";
}},
// tables queried
{[](FieldParserContext& ctx) {
ctx.sql << "'" << PrintTableList(ctx.record->tables) << "'";
}},
// Schema Version 1.1.0 Columns
// select columns
{[](FieldParserContext& ctx) {
ctx.sql << "'" << boost::algorithm::join(ctx.record->select_columns, ",") << "'";
}},
// where columns
{[](FieldParserContext& ctx) {
ctx.sql << "'" << boost::algorithm::join(ctx.record->where_columns, ",") << "'";
}},
// join columns
{[](FieldParserContext& ctx) {
ctx.sql << "'" << boost::algorithm::join(ctx.record->join_columns, ",") << "'";
}},
// aggregate columns
{[](FieldParserContext& ctx) {
ctx.sql << "'" << boost::algorithm::join(ctx.record->aggregate_columns, ",") << "'";
}},
// orderby columns
{[](FieldParserContext& ctx) {
ctx.sql << "'" << boost::algorithm::join(ctx.record->orderby_columns, ",") << "'";
}},
// coordinator slots
{[](FieldParserContext& ctx) {
ctx.sql << ctx.record->base_state->coordinator_slots;
}},
// executor slots
{[](FieldParserContext& ctx) {
ctx.sql << ctx.record->base_state->executor_slots;
}}}}; // FIELD_PARSERS constant array
} // namespace workloadmgmt
/// Queue of completed queries and the mutex to synchronize access to it.
list<CompletedQuery> _completed_queries;
mutex _completed_queries_mu;
/// Coordinate periodic execution of the completed queries queue processing thread.
condition_variable _completed_queries_cv;
/// Coordinate shutdown of the completed queries queue processing thread.
condition_variable _completed_queries_shutdown_cv;
/// Determine if the maximum number of queued completed queries has been exceeded.
///
/// Return:
/// `true` There is a max limit on the number of queued completed queries and that
/// limit has been exceeded.
/// `false` Either there is no max number of queued completed queries or there is a
/// limit that has not been exceeded.
static inline bool _maxRecordsExceeded(size_t record_count) noexcept {
return FLAGS_query_log_max_queued > 0 && record_count > FLAGS_query_log_max_queued;
} // function _maxRecordsExceeded
/// Iterates through the list of field in `FIELDS_PARSERS` executing each parser for the
/// given `QueryStateExpanded` object. This function builds the `FieldParserContext`
/// object that is passed to each parser.
///
/// Parameters:
/// `rec` - `QueryStateExpanded` object, an insert sql statement will be generated to
/// insert a row into the completed queries table representing the query in
/// this object.
///
/// Return:
/// `string` - Contains the insert sql statement.
static const string _queryStateToSql(
const QueryStateExpanded* rec, const Version& target_schema_version) noexcept {
DCHECK(rec != nullptr);
StringStreamPop sql;
FieldParserContext ctx(rec, FLAGS_cluster_id, sql);
sql << "\n(";
for (const auto& field : FIELD_DEFINITIONS) {
if (field.second.Include(target_schema_version)) {
FieldParser parser = FIELD_PARSERS[field.first];
parser(ctx);
sql << ",";
}
}
sql.move_back();
sql << ")";
return sql.str();
} // function _queryStateToSql
size_t ImpalaServer::NumLiveQueries() {
size_t live_queries = query_driver_map_.Count();
lock_guard<mutex> l(_completed_queries_mu);
return live_queries + _completed_queries.size();
}
void ImpalaServer::ShutdownWorkloadManagement() {
unique_lock<mutex> l(workload_mgmt_state_mu_);
// Handle the situation where this function runs before the workload management process
// has been started and thus workload_management_thread_ holds a nullptr.
if (workload_mgmt_state_ == WorkloadManagementState::NOT_STARTED) {
workload_mgmt_state_ = WorkloadManagementState::SHUTDOWN;
return;
}
DCHECK_NE(nullptr, workload_management_thread_.get());
// If the completed queries thread is not yet running, then we don't need to give it a
// chance to flush the in-memory queue to the completed queries table.
if (workload_mgmt_state_ == WorkloadManagementState::RUNNING) {
workload_mgmt_state_ = WorkloadManagementState::SHUTTING_DOWN;
LOG(INFO) << "Workload management is shutting down";
_completed_queries_cv.notify_all();
_completed_queries_shutdown_cv.wait_for(l,
chrono::seconds(FLAGS_query_log_shutdown_timeout_s),
[this] { return workload_mgmt_state_ == WorkloadManagementState::SHUTDOWN; });
}
switch (workload_mgmt_state_) {
case WorkloadManagementState::SHUTDOWN:
// Safe to join the thread here because the workload managmenent processing loop
// sets the thread state to ThreadState::SHUTDOWN immediately before it returns.
LOG(INFO) << "Workload management shutdown successful";
workload_management_thread_->Join();
break;
default:
// The shutdown timeout expired without the completed queries queue draining.
LOG(INFO) << "Workload management shutdown timed out. Up to '"
<< ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue()
<< "' queries may have "
<< "been lost";
workload_management_thread_->Detach();
break;
}
} // function ImpalaServer::ShutdownWorkloadManagement
void ImpalaServer::EnqueueCompletedQuery(
const QueryHandle& query_handle, shared_ptr<QueryStateRecord> qs_rec) {
// Do not enqueue queries that are not written to the table or if workload management is
// not enabled.
if (query_handle->stmt_type() == TStmtType::SET
|| !query_handle.query_driver()->IncludedInQueryLog()
|| !FLAGS_enable_workload_mgmt) {
return; // Note: early return
}
// Do not enqueue use and show ddl queries. This check is separate because combining it
// with the previous check resulted in very confusing code that had duplication.
if (query_handle->stmt_type() == TStmtType::DDL) {
switch (query_handle->catalog_op_type()) {
case TCatalogOpType::SHOW_TABLES:
case TCatalogOpType::SHOW_DBS:
case TCatalogOpType::SHOW_STATS:
case TCatalogOpType::USE:
case TCatalogOpType::SHOW_FUNCTIONS:
case TCatalogOpType::SHOW_CREATE_TABLE:
case TCatalogOpType::SHOW_DATA_SRCS:
case TCatalogOpType::SHOW_ROLES:
case TCatalogOpType::SHOW_GRANT_PRINCIPAL:
case TCatalogOpType::SHOW_FILES:
case TCatalogOpType::SHOW_CREATE_FUNCTION:
case TCatalogOpType::SHOW_VIEWS:
case TCatalogOpType::SHOW_METADATA_TABLES:
case TCatalogOpType::DESCRIBE_TABLE:
case TCatalogOpType::DESCRIBE_DB:
case TCatalogOpType::DESCRIBE_HISTORY:
VLOG_QUERY << "skipping enqueue of completed query '"
<< PrintId(query_handle->query_id()) << "' with type '"
<< query_handle->stmt_type() << "' and catalog op type '"
<< query_handle->catalog_op_type() << "'";
return; // Note: early return, query will not be added to the queue
case TCatalogOpType::RESET_METADATA:
case TCatalogOpType::DDL:
// No-op, continue execution of this function.
break;
default:
LOG(WARNING) << "unknown ddl type: "
<< to_string(query_handle->catalog_op_type());
DCHECK(false);
return; // Note: early return
}
} else if (query_handle->stmt_type() == TStmtType::UNKNOWN) {
if (query_handle->hs2_metadata_op()) {
VLOG_QUERY << "skipping enqueue of completed query '"
<< PrintId(query_handle->query_id()) << "' with type '"
<< query_handle->stmt_type() << "'";
return; // Note: early return, query will not be added to the queue
}
}
shared_ptr<QueryStateExpanded> exp_rec =
make_shared<QueryStateExpanded>(*query_handle, move(qs_rec));
{
lock_guard<mutex> l(_completed_queries_mu);
_completed_queries.emplace_back(CompletedQuery(move(exp_rec)));
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(1L);
if (_maxRecordsExceeded(_completed_queries.size())) {
_completed_queries_cv.notify_all();
}
}
VLOG_QUERY << "enqueued completed '" << query_handle->stmt_type() << "' query '"
<< PrintId(query_handle->query_id()) << "'";
} // ImpalaServer::EnqueueCompletedQuery
static string _dmlPrefix(const string& table_name, const Version& target_schema_version) {
StringStreamPop fields;
fields << "INSERT INTO " << table_name << "(";
for (const auto& field : FIELD_DEFINITIONS) {
if (field.second.Include(target_schema_version)) {
fields << to_string(field.first) << ",";
}
}
fields.move_back();
fields << ") VALUES ";
return fields.str();
} // function _dmlPrefix
bool ImpalaServer::IsWorkloadManagementShuttingDown() {
lock_guard<mutex> l(workload_mgmt_state_mu_);
DCHECK(workload_mgmt_state_ != WorkloadManagementState::SHUTDOWN);
if (UNLIKELY(workload_mgmt_state_ == WorkloadManagementState::SHUTTING_DOWN)) {
workload_mgmt_state_ = WorkloadManagementState::SHUTDOWN;
return true;
}
return false;
} // function ImpalaServer::IsWorkloadManagementShuttingDown
void ImpalaServer::WorkloadManagementWorker(const Version& target_schema_version) {
{
lock_guard<mutex> l(workload_mgmt_state_mu_);
workload_mgmt_state_ = WorkloadManagementState::STARTING;
}
{
lock_guard<mutex> l(workload_mgmt_state_mu_);
workload_mgmt_state_ = WorkloadManagementState::STARTED;
}
/// Ticker that wakes up at set intervals to process the queued completed queries. Uses
/// the _completed_queries_mu to synchonize access to the _completed_queries list.
unique_ptr<TickerSecondsBool> completed_queries_ticker;
{
lock_guard<mutex> l(workload_mgmt_state_mu_);
// This condition will evaluate to false only if a clean shutdown was initiated while
// the previous function was running.
if (LIKELY(workload_mgmt_state_ == WorkloadManagementState::STARTED)) {
workload_mgmt_state_ = WorkloadManagementState::RUNNING;
} else {
LOG(INFO) << "Not starting workload management processing thread because "
<< "coordinator shutdown was initiated.";
return; // Note: early return
}
completed_queries_ticker = make_unique<TickerSecondsBool>(
FLAGS_query_log_write_interval_s, _completed_queries_cv, _completed_queries_mu);
ABORT_IF_ERROR(
completed_queries_ticker->Start("impala-server", "completed-queries-ticker"));
}
// Non-values portion of the sql DML to insert records into the completed queries
// tables. This portion of the statement is constant and thus is only generated once.
const string log_table_name = QueryLogTableName(true);
const string insert_dml_prefix = _dmlPrefix(log_table_name, target_schema_version);
VLOG(2) << "Workload Management insert sql prefix: " << insert_dml_prefix;
// Setup default query options that will be provided on all queries that insert rows
// into the completed queries table.
InternalServer::QueryOptionMap insert_query_opts;
insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC";
insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string(
FLAGS_query_log_write_timeout_s < 1 ? FLAGS_query_log_write_interval_s :
FLAGS_query_log_write_timeout_s);
if (!FLAGS_query_log_request_pool.empty()) {
insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] = FLAGS_query_log_request_pool;
}
insert_query_opts[TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS] = "0";
insert_query_opts[TImpalaQueryOptions::EXEC_TIME_LIMIT_S] =
std::to_string(FLAGS_query_log_dml_exec_timeout_s);
if (!FLAGS_debug_actions.empty()) {
insert_query_opts[TImpalaQueryOptions::DEBUG_ACTION] = FLAGS_debug_actions;
}
// Hide analyzed query since it can be prohibitively long.
insert_query_opts[TImpalaQueryOptions::HIDE_ANALYZED_QUERY] = "true";
while (true) {
// Exit this thread if a shutdown was initiated.
if (IsWorkloadManagementShuttingDown()) {
_completed_queries_shutdown_cv.notify_all();
return; // Note: early return
}
// Sleep this thread until it is time to process queued completed queries. During the
// wait, the _completed_queries_mu is only locked while calling the lambda function
// predicate. After waking up, the _completed_queries_mu will be locked.
list<CompletedQuery> queries_to_insert;
MonotonicStopWatch timer;
{
unique_lock<mutex> l(_completed_queries_mu);
_completed_queries_cv.wait(l, [this, &completed_queries_ticker] {
lock_guard<mutex> l2(workload_mgmt_state_mu_);
// To guard against spurious wakeups, this predicate ensures there are
// completed queries queued up before waking up the thread.
return (completed_queries_ticker->WakeupGuard()() && !_completed_queries.empty())
|| _maxRecordsExceeded(_completed_queries.size())
|| UNLIKELY(workload_mgmt_state_ == WorkloadManagementState::SHUTTING_DOWN);
});
completed_queries_ticker->ResetWakeupGuard();
DebugActionNoFail(FLAGS_debug_actions, "WM_SHUTDOWN_DELAY");
if (_completed_queries.empty()) continue;
if (_maxRecordsExceeded(_completed_queries.size())) {
ImpaladMetrics::COMPLETED_QUERIES_MAX_RECORDS_WRITES->Increment(1L);
} else {
ImpaladMetrics::COMPLETED_QUERIES_SCHEDULED_WRITES->Increment(1L);
}
timer.Start();
// Copy all completed queries to a temporary list so that inserts to the
// completed_queries list are not blocked while generating and running an insert
// SQL statement for the completed queries.
queries_to_insert.splice(queries_to_insert.cend(), _completed_queries);
}
string sql;
uint32_t max_row_size = 0;
for (auto iter = queries_to_insert.begin(); iter != queries_to_insert.end(); iter++) {
if (iter->insert_attempts_count >= FLAGS_query_log_max_insert_attempts) {
LOG(ERROR) << "could not write completed query table=\"" << log_table_name
<< "\" query_id=\"" << PrintId(iter->query->base_state->id) << "\"";
iter = queries_to_insert.erase(iter);
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(-1);
continue;
}
// Increment the count of attempts to insert this query into the completed
// queries table.
iter->insert_attempts_count += 1;
const string row = _queryStateToSql(iter->query.get(), target_schema_version);
if (row.size() > max_row_size) {
max_row_size = row.size();
}
StrAppend(&sql, move(row), ",");
VLOG(2) << "added query '" << iter->query->base_state->id
<< "' to insert sql. Insert attempt '"
<< iter->insert_attempts_count << "'.";
}
DCHECK(
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= queries_to_insert.size());
// In the case where queries_to_insert only contains records that have exceeded
// the max insert attempts, sql will be empty.
if (UNLIKELY(sql.empty())) continue;
// Remove the last comma and determine the final sql statement length.
sql.pop_back();
const size_t final_sql_len = insert_dml_prefix.size() + sql.size();
uint64_t gather_time = timer.Reset();
TUniqueId tmp_query_id;
// Build query options to ensure the query is not rejected.
InternalServer::QueryOptionMap opts = insert_query_opts;
if (UNLIKELY(final_sql_len > numeric_limits<int32_t>::max())) {
LOG(ERROR) << "Completed queries table insert sql statement of length '"
<< final_sql_len << "' was longer than the maximum of '"
<< numeric_limits<int32_t>::max() << "', skipping";
continue; // NOTE: early loop continuation
}
// Set max_statement_length_bytes based on actual query, and at least the minimum.
opts[TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES] =
std::to_string(max<size_t>(MIN_MAX_STATEMENT_LENGTH_BYTES, final_sql_len));
// Set statement_expression_limit based on the startup flag. The flag validation
// ensures this value is numeric and falls within an acceptable range.
if (FLAGS_query_log_expression_limit >= 0) {
opts[TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT] =
std::to_string(FLAGS_query_log_expression_limit);
}
opts[TImpalaQueryOptions::MAX_ROW_SIZE] = std::to_string(max_row_size);
// Execute the insert dml.
const Status ret_status = ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
StrCat(insert_dml_prefix, sql), opts, false, &tmp_query_id);
uint64_t exec_time = timer.ElapsedTime();
ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update(gather_time + exec_time);
stringstream log_msg_props;
log_msg_props
<< " table=\"" << log_table_name << "\""
<< " record_count=\"" << queries_to_insert.size() << "\""
<< " bytes=\"" << PrettyPrinter::PrintBytes(sql.size()) << "\""
<< " gather_time=\"" << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) << "\""
<< " exec_time=\"" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS) << "\""
<< " query_id=\"" << PrintId(tmp_query_id) << "\"";
if (ret_status.ok()) {
LOG(INFO) << "wrote completed queries" << log_msg_props.rdbuf();
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(queries_to_insert.size() * -1);
DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0);
ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(queries_to_insert.size());
} else {
log_msg_props << " msg=\"" << ret_status.msg().msg() << "\"";
LOG(WARNING) << "failed to write completed queries" << log_msg_props.rdbuf();
LOG(WARNING) << ret_status.GetDetail();
ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size());
{
lock_guard<mutex> l(_completed_queries_mu);
_completed_queries.splice(_completed_queries.cend(), queries_to_insert);
}
}
}
} // function ImpalaServer::WorkloadManagementWorker
vector<shared_ptr<QueryStateExpanded>> ImpalaServer::GetCompletedQueries() {
lock_guard<mutex> l(_completed_queries_mu);
vector<shared_ptr<QueryStateExpanded>> results;
results.reserve(_completed_queries.size());
for (const auto& r : _completed_queries) {
results.emplace_back(r.query);
}
return results;
} // function ImpalaServer::GetCompletedQueries
} // namespace impala