blob: 229ad560c8a7785340dc094d6058e7a0c2f97050 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "system-table-scanner.h"
#include <memory>
#include <boost/algorithm/string.hpp>
#include <gutil/strings/substitute.h>
#include "gen-cpp/SystemTables_types.h"
#include "common/names.h"
#include "runtime/decimal-value.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-driver.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "runtime/tuple-row.h"
#include "service/query-state-record.h"
#include "util/debug-util.h"
#include "workload_mgmt/workload-management.h"
using namespace boost::algorithm;
using strings::Substitute;
DECLARE_string(cluster_id);
namespace impala {
static const string ERROR_MEM_LIMIT_EXCEEDED =
"SystemTableScanNode::$0() failed to allocate $1 bytes.";
// Constant declaring how to convert from micro and nano seconds to milliseconds.
static constexpr double MICROS_TO_MILLIS = 1000;
static constexpr double NANOS_TO_MILLIS = 1000000;
Status SystemTableScanner::CreateScanner(RuntimeState* state, RuntimeProfile* profile,
TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>* scanner) {
switch (table_name) {
case TSystemTableName::IMPALA_QUERY_LIVE:
*scanner = make_unique<QueryScanner>(state, profile, table_name);
break;
default:
return Status(ErrorMsg(TErrorCode::NOT_IMPLEMENTED_ERROR,
Substitute("Unknown table name: $0", table_name)));
}
return Status::OK();
}
/// Write a string value to a STRING slot, allocating memory from 'pool'. Returns
/// an error if memory cannot be allocated without exceeding a memory limit.
Status SystemTableScanner::WriteStringSlot(
const char* data, int len, MemPool* pool, void* slot) {
char* buffer = reinterpret_cast<char*>(pool->TryAllocateUnaligned(len));
if (UNLIKELY(buffer == nullptr)) {
string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "WriteStringSlot", len);
return pool->mem_tracker()->MemLimitExceeded(state_, details, len);
}
memcpy(buffer, data, len);
reinterpret_cast<StringValue*>(slot)->Assign(buffer, len);
return Status::OK();
}
Status SystemTableScanner::WriteStringSlot(const string& str, MemPool* pool, void* slot) {
return WriteStringSlot(str.data(), str.size(), pool, slot);
}
static void WriteUnixTimestampSlot(int64_t unix_time_micros, void* slot) {
*reinterpret_cast<TimestampValue*>(slot) =
TimestampValue::UtcFromUnixTimeMicros(unix_time_micros);
}
static void WriteBigIntSlot(int64_t value, void* slot) {
*reinterpret_cast<int64_t*>(slot) = value;
}
static void WriteIntSlot(int32_t value, void* slot) {
*reinterpret_cast<int32_t*>(slot) = value;
}
static void WriteDecimalSlot(
const ColumnType& type, double value, void* slot) {
bool overflow = false;
switch (type.GetByteSize()) {
case 4:
*reinterpret_cast<Decimal4Value*>(slot) =
Decimal4Value::FromDouble(type, value, false, &overflow);
case 8:
*reinterpret_cast<Decimal8Value*>(slot) =
Decimal8Value::FromDouble(type, value, false, &overflow);
case 16:
*reinterpret_cast<Decimal16Value*>(slot) =
Decimal16Value::FromDouble(type, value, false, &overflow);
}
DCHECK(!overflow);
}
QueryScanner::QueryScanner(RuntimeState* state, RuntimeProfile* profile,
TSystemTableName::type table_name)
: SystemTableScanner(state, profile, table_name),
active_query_collection_timer_(ADD_TIMER(profile_, "ActiveQueryCollectionTime")),
pending_query_collection_timer_(ADD_TIMER(profile_, "PendingQueryCollectionTime"))
{}
Status QueryScanner::Open() {
ImpalaServer* server = ExecEnv::GetInstance()->impala_server();
// Get a sorted list of state snapshots for all active queries. This mimics the
// behavior of ImpalaHttpHandler::QueryStateHandler. Using snapshots avoids potential
// memory violations of trying to use a ClientRequestState pointer without holding a
// shared_ptr to the QueryDriver and avoids keeping the query open longer than
// necessary (via that shared_ptr). The cost is that we allocate memory for the
// QueryStateRecords, which should be relatively small.
{
SCOPED_TIMER(active_query_collection_timer_);
server->query_driver_map_.DoFuncForAllEntries(
[&](const std::shared_ptr<QueryDriver>& query_driver) {
query_records_.emplace_back(make_shared<QueryStateExpanded>(
*query_driver->GetActiveClientRequestState()));
});
}
unordered_set<TUniqueId> running_queries;
running_queries.reserve(query_records_.size());
for (const auto& r : query_records_) {
running_queries.insert(r->base_state->id);
}
// It's possible for a query to appear in both query_driver_map_ and completed_queries_
// if it's been added to completed_queries_ in CloseClientRequestState and has not yet
// been removed from query_driver_map_ in QueryDriver::Unregister. Collection order
// ensures we don't miss one by collecting before it's been added to completed_queries_,
// then after it's added to completed_queries_ and removed from query_driver_map_.
// Avoid adding entries if they already exist.
{
SCOPED_TIMER(pending_query_collection_timer_);
for (const auto& r : server->GetCompletedQueries()) {
if (running_queries.find(r->base_state->id) == running_queries.end()) {
query_records_.emplace_back(r);
}
}
}
if (query_records_.empty()) eos_ = true;
return Status::OK();
}
static void WriteEvent(const QueryStateExpanded& query, const SlotDescriptor* slot_desc,
void* slot, QueryEvent name) {
const auto& event = query.events.find(name);
DCHECK(event != query.events.end());
WriteDecimalSlot(slot_desc->type(), event->second / NANOS_TO_MILLIS, slot);
}
Status QueryScanner::MaterializeNextTuple(
MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc) {
using impala::workloadmgmt::IncludeField;
DCHECK(!query_records_.empty());
const QueryStateExpanded& query = *query_records_.front();
const QueryStateRecord& record = *query.base_state;
ExecEnv* exec_env = ExecEnv::GetInstance();
// Verify there are no clustering columns (partitions) to offset col_pos.
DCHECK_EQ(0, tuple_desc->table_desc()->num_clustering_cols());
for (const SlotDescriptor* slot_desc : tuple_desc->slots()) {
void* slot = tuple->GetSlot(slot_desc->tuple_offset());
switch (slot_desc->col_pos()) {
case TQueryTableColumn::CLUSTER_ID:
RETURN_IF_ERROR(WriteStringSlot(FLAGS_cluster_id, pool, slot));
break;
case TQueryTableColumn::QUERY_ID:
RETURN_IF_ERROR(WriteStringSlot(PrintId(record.id), pool, slot));
break;
case TQueryTableColumn::SESSION_ID:
RETURN_IF_ERROR(WriteStringSlot(PrintId(query.session_id), pool, slot));
break;
case TQueryTableColumn::SESSION_TYPE:
RETURN_IF_ERROR(WriteStringSlot(to_string(query.session_type), pool, slot));
break;
case TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION:
if (query.session_type == TSessionType::HIVESERVER2) {
RETURN_IF_ERROR(WriteStringSlot(
query.hiveserver2_protocol_version_formatted(), pool, slot));
}
break;
case TQueryTableColumn::DB_USER:
RETURN_IF_ERROR(WriteStringSlot(record.effective_user, pool, slot));
break;
case TQueryTableColumn::DB_USER_CONNECTION:
RETURN_IF_ERROR(WriteStringSlot(query.db_user_connection, pool, slot));
break;
case TQueryTableColumn::DB_NAME:
RETURN_IF_ERROR(WriteStringSlot(record.default_db, pool, slot));
break;
case TQueryTableColumn::IMPALA_COORDINATOR:
RETURN_IF_ERROR(WriteStringSlot(
TNetworkAddressToString(exec_env->configured_backend_address()), pool, slot));
break;
case TQueryTableColumn::QUERY_STATUS:
RETURN_IF_ERROR(WriteStringSlot(record.query_status.ok() ?
"OK" : record.query_status.msg().msg(), pool, slot));
break;
case TQueryTableColumn::QUERY_STATE:
RETURN_IF_ERROR(WriteStringSlot(record.query_state, pool, slot));
break;
case TQueryTableColumn::IMPALA_QUERY_END_STATE:
RETURN_IF_ERROR(WriteStringSlot(query.impala_query_end_state, pool, slot));
break;
case TQueryTableColumn::QUERY_TYPE:
RETURN_IF_ERROR(WriteStringSlot(to_string(record.stmt_type), pool, slot));
break;
case TQueryTableColumn::NETWORK_ADDRESS:
RETURN_IF_ERROR(WriteStringSlot(
TNetworkAddressToString(query.client_address), pool, slot));
break;
case TQueryTableColumn::START_TIME_UTC:
WriteUnixTimestampSlot(record.start_time_us, slot);
break;
case TQueryTableColumn::TOTAL_TIME_MS: {
const int64_t end_time_us =
record.end_time_us > 0 ? record.end_time_us : UnixMicros();
double duration_us = (end_time_us - record.start_time_us) / MICROS_TO_MILLIS;
WriteDecimalSlot(slot_desc->type(), duration_us, slot);
break;
}
case TQueryTableColumn::QUERY_OPTS_CONFIG:
RETURN_IF_ERROR(WriteStringSlot(
DebugQueryOptions(query.query_options), pool, slot));
break;
case TQueryTableColumn::RESOURCE_POOL:
RETURN_IF_ERROR(WriteStringSlot(record.resource_pool, pool, slot));
break;
case TQueryTableColumn::PER_HOST_MEM_ESTIMATE:
WriteBigIntSlot(query.per_host_mem_estimate, slot);
break;
case TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE:
WriteBigIntSlot(query.dedicated_coord_mem_estimate, slot);
break;
case TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES:
if (!query.per_host_state.empty()) {
stringstream ss;
for (const auto& state : query.per_host_state) {
ss << TNetworkAddressToString(state.first) << "="
<< state.second.fragment_instance_count << ",";
}
string s = ss.str();
s.pop_back();
RETURN_IF_ERROR(WriteStringSlot(s, pool, slot));
}
break;
case TQueryTableColumn::BACKENDS_COUNT:
DCHECK_LE(query.per_host_state.size(), numeric_limits<int32_t>::max());
WriteIntSlot(query.per_host_state.size(), slot);
break;
case TQueryTableColumn::ADMISSION_RESULT:
RETURN_IF_ERROR(WriteStringSlot(query.admission_result, pool, slot));
break;
case TQueryTableColumn::CLUSTER_MEMORY_ADMITTED:
WriteBigIntSlot(record.cluster_mem_est, slot);
break;
case TQueryTableColumn::EXECUTOR_GROUP:
RETURN_IF_ERROR(WriteStringSlot(query.executor_group, pool, slot));
break;
case TQueryTableColumn::EXECUTOR_GROUPS:
RETURN_IF_ERROR(WriteStringSlot(query.executor_groups, pool, slot));
break;
case TQueryTableColumn::EXEC_SUMMARY:
RETURN_IF_ERROR(WriteStringSlot(query.exec_summary, pool, slot));
break;
case TQueryTableColumn::NUM_ROWS_FETCHED:
WriteBigIntSlot(record.num_rows_fetched, slot);
break;
case TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC:
WriteBigIntSlot(query.row_materialization_rate, slot);
break;
case TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS:
WriteDecimalSlot(slot_desc->type(),
query.row_materialization_time / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::COMPRESSED_BYTES_SPILLED:
WriteBigIntSlot(query.compressed_bytes_spilled, slot);
break;
case TQueryTableColumn::EVENT_PLANNING_FINISHED:
WriteEvent(query, slot_desc, slot, QueryEvent::PLANNING_FINISHED);
break;
case TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION:
WriteEvent(query, slot_desc, slot, QueryEvent::SUBMIT_FOR_ADMISSION);
break;
case TQueryTableColumn::EVENT_COMPLETED_ADMISSION:
WriteEvent(query, slot_desc, slot, QueryEvent::COMPLETED_ADMISSION);
break;
case TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED:
WriteEvent(query, slot_desc, slot, QueryEvent::ALL_BACKENDS_STARTED);
break;
case TQueryTableColumn::EVENT_ROWS_AVAILABLE:
WriteEvent(query, slot_desc, slot, QueryEvent::ROWS_AVAILABLE);
break;
case TQueryTableColumn::EVENT_FIRST_ROW_FETCHED:
WriteEvent(query, slot_desc, slot, QueryEvent::FIRST_ROW_FETCHED);
break;
case TQueryTableColumn::EVENT_LAST_ROW_FETCHED:
WriteEvent(query, slot_desc, slot, QueryEvent::LAST_ROW_FETCHED);
break;
case TQueryTableColumn::EVENT_UNREGISTER_QUERY:
WriteEvent(query, slot_desc, slot, QueryEvent::UNREGISTER_QUERY);
break;
case TQueryTableColumn::READ_IO_WAIT_TOTAL_MS:
WriteDecimalSlot(slot_desc->type(),
query.read_io_wait_time_total / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::READ_IO_WAIT_MEAN_MS:
WriteDecimalSlot(slot_desc->type(),
query.read_io_wait_time_mean / NANOS_TO_MILLIS, slot);
break;
case TQueryTableColumn::BYTES_READ_CACHE_TOTAL:
WriteBigIntSlot(query.bytes_read_cache_total, slot);
break;
case TQueryTableColumn::BYTES_READ_TOTAL:
WriteBigIntSlot(query.bytes_read_total, slot);
break;
case TQueryTableColumn::PERNODE_PEAK_MEM_MIN:
if (auto min_elem = min_element(query.per_host_state.cbegin(),
query.per_host_state.cend(), PerHostPeakMemoryComparator);
LIKELY(min_elem != query.per_host_state.cend())) {
WriteBigIntSlot(min_elem->second.peak_memory_usage, slot);
}
break;
case TQueryTableColumn::PERNODE_PEAK_MEM_MAX:
if (auto max_elem = max_element(query.per_host_state.cbegin(),
query.per_host_state.cend(), PerHostPeakMemoryComparator);
LIKELY(max_elem != query.per_host_state.cend())) {
WriteBigIntSlot(max_elem->second.peak_memory_usage, slot);
}
break;
case TQueryTableColumn::PERNODE_PEAK_MEM_MEAN:
if (LIKELY(!query.per_host_state.empty())) {
int64_t calc_mean = 0;
for (const auto& host : query.per_host_state) {
calc_mean += host.second.peak_memory_usage;
}
calc_mean /= query.per_host_state.size();
WriteBigIntSlot(calc_mean, slot);
}
break;
case TQueryTableColumn::SQL:
RETURN_IF_ERROR(WriteStringSlot(query.redacted_sql, pool, slot));
break;
case TQueryTableColumn::PLAN:
RETURN_IF_ERROR(WriteStringSlot(
trim_left_copy_if(record.plan, is_any_of("\n")), pool, slot));
break;
case TQueryTableColumn::TABLES_QUERIED:
if (!query.tables.empty()) {
RETURN_IF_ERROR(WriteStringSlot(PrintTableList(query.tables), pool, slot));
}
break;
case TQueryTableColumn::SELECT_COLUMNS:
if (!query.select_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::SELECT_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.select_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::WHERE_COLUMNS:
if (!query.where_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::WHERE_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.where_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::JOIN_COLUMNS:
if (!query.join_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::JOIN_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.join_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::AGGREGATE_COLUMNS:
if (!query.aggregate_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::AGGREGATE_COLUMNS))) {
RETURN_IF_ERROR(
WriteStringSlot(join(query.aggregate_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::ORDERBY_COLUMNS:
if (!query.orderby_columns.empty()
&& LIKELY(IncludeField(TQueryTableColumn::type::ORDERBY_COLUMNS))) {
RETURN_IF_ERROR(WriteStringSlot(join(query.orderby_columns, ","), pool, slot));
}
break;
case TQueryTableColumn::COORDINATOR_SLOTS:
if (LIKELY(IncludeField(TQueryTableColumn::type::COORDINATOR_SLOTS))) {
WriteBigIntSlot(record.coordinator_slots, slot);
}
break;
case TQueryTableColumn::EXECUTOR_SLOTS:
if (LIKELY(IncludeField(TQueryTableColumn::type::EXECUTOR_SLOTS))) {
WriteBigIntSlot(record.executor_slots, slot);
}
break;
default:
LOG(WARNING) << "Unknown column (position " << slot_desc->col_pos() << ") added"
" to table " << table_name_ << "; check if a coordinator was upgraded";
tuple->SetNull(slot_desc->null_indicator_offset());
break;
}
}
query_records_.pop_front();
if (query_records_.empty()) eos_ = true;
return Status::OK();
}
} /* namespace impala */