blob: 5102ad93f1f9a663447bff8029363e8aed686480 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/runtime-state.h"
#include <jni.h>
#include <iostream>
#include <sstream>
#include <string>
#include <boost/algorithm/string/join.hpp>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "codegen/llvm-codegen.h"
#include "common/object-pool.h"
#include "common/status.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-fn-call.h"
#include "exprs/timezone_db.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/krpc-data-stream-recvr.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-filter-bank.h"
#include "runtime/thread-resource-mgr.h"
#include "runtime/timestamp-value.h"
#include "util/auth-util.h" // for GetEffectiveUser()
#include "util/bitmap.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/error-util.h"
#include "util/jni-util.h"
#include "util/mem-info.h"
#include "util/pretty-printer.h"
#include "util/test-info.h"
#include "common/names.h"
using strings::Substitute;
DECLARE_int32(max_errors);
namespace impala {
const char* RuntimeState::LLVM_CLASS_NAME = "class.impala::RuntimeState";
RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env)
: query_state_(query_state),
fragment_ctx_(&fragment_ctx),
instance_ctx_(&instance_ctx),
now_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
query_state->query_ctx().now_string))),
utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
query_state->query_ctx().utc_timestamp_string))),
local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
profile_(RuntimeProfile::Create(
obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
instance_buffer_reservation_(obj_pool()->Add(new ReservationTracker)) {
Init();
}
// Constructor for standalone RuntimeState for test execution and fe-support.cc.
// Sets up a dummy local QueryState (with mem_limit picked up from the query options)
// to allow evaluating exprs, etc.
RuntimeState::RuntimeState(
const TQueryCtx& qctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl)
: query_state_(new QueryState(qctx, qctx.client_request.query_options.__isset.mem_limit
&& qctx.client_request.query_options.mem_limit > 0 ?
qctx.client_request.query_options.mem_limit :
-1,
"test-pool")),
fragment_ctx_(nullptr),
instance_ctx_(nullptr),
local_query_state_(query_state_),
now_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(qctx.now_string))),
utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
qctx.utc_timestamp_string))),
local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")),
instance_buffer_reservation_(nullptr) {
// We may use execution resources while evaluating exprs, etc. Decremented in
// ReleaseResources() to release resources.
local_query_state_->AcquireBackendResourceRefcount();
if (query_ctx().request_pool.empty()) {
const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
}
if (desc_tbl != nullptr) query_state_->desc_tbl_ = desc_tbl;
Init();
}
RuntimeState::~RuntimeState() {
DCHECK(released_resources_) << "Must call ReleaseResources()";
// IMPALA-8270: run local_query_state_ destructor *before* other destructors so that
// teardown order for the TestEnv/fe-support RuntimeState matches the teardown order
// for the "real" RuntimeState. The previous divergence lead to hard-to-find bugs.
if (local_query_state_ != nullptr) local_query_state_.reset();
}
void RuntimeState::Init() {
SCOPED_TIMER(profile_->total_time_counter());
// Register with the thread mgr
resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
DCHECK(resource_pool_ != nullptr);
if (fragment_ctx_ != nullptr) {
// Ensure that the planner correctly determined the required threads.
resource_pool_->set_max_required_threads(fragment_ctx_->fragment.thread_reservation);
}
total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "TotalThreads");
total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime");
total_network_send_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkSendTime");
total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime");
instance_mem_tracker_ = obj_pool()->Add(new MemTracker(
runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
if (instance_buffer_reservation_ != nullptr) {
instance_buffer_reservation_->InitChildTracker(profile_,
query_state_->buffer_reservation(), instance_mem_tracker_,
numeric_limits<int64_t>::max());
}
// Find local timezone.
// (For FE tests leave 'local_time_zone_' as default. FE tests don't load the timezone
// db since they don't need any timezone information.)
if (!TestInfo::is_fe_test()) {
const Timezone* tz = TimezoneDatabase::FindTimezone(query_ctx().local_time_zone);
if (tz != nullptr) {
local_time_zone_ = tz;
} else {
const string msg = Substitute(
"Failed to find local timezone '$0'.Falling back to UTC.",
query_ctx().local_time_zone);
LOG(WARNING) << msg;
LogError(ErrorMsg(TErrorCode::GENERAL, msg));
local_time_zone_ = &TimezoneDatabase::GetUtcTimezone();
}
}
}
Status RuntimeState::InitFilterBank(long runtime_filters_reservation_bytes) {
filter_bank_.reset(
new RuntimeFilterBank(query_ctx(), this, runtime_filters_reservation_bytes));
return filter_bank_->ClaimBufferReservation();
}
Status RuntimeState::CreateCodegen() {
if (codegen_.get() != NULL) return Status::OK();
// TODO: add the fragment ID to the codegen ID as well
RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(this,
instance_mem_tracker_, PrintId(fragment_instance_id()), &codegen_));
codegen_->EnableOptimizations(true);
profile_->AddChild(codegen_->runtime_profile());
return Status::OK();
}
Status RuntimeState::CodegenScalarExprs() {
for (auto& item : scalar_exprs_to_codegen_) {
llvm::Function* fn;
RETURN_IF_ERROR(item.first->GetCodegendComputeFn(codegen_.get(), item.second, &fn));
}
return Status::OK();
}
Status RuntimeState::StartSpilling(MemTracker* mem_tracker) {
return query_state_->StartSpilling(this, mem_tracker);
}
string RuntimeState::ErrorLog() {
lock_guard<SpinLock> l(error_log_lock_);
return PrintErrorMapToString(error_log_);
}
bool RuntimeState::LogError(const ErrorMsg& message, int vlog_level) {
lock_guard<SpinLock> l(error_log_lock_);
// All errors go to the log, unreported_error_count_ is counted independently of the
// size of the error_log to account for errors that were already reported to the
// coordinator
VLOG(vlog_level) << "Error from query " << PrintId(query_id()) << ": " << message.msg();
if (ErrorCount(error_log_) < query_options().max_errors) {
AppendError(&error_log_, message);
return true;
}
return false;
}
void RuntimeState::GetUnreportedErrors(ErrorLogMapPB* new_errors) {
new_errors->clear();
lock_guard<SpinLock> l(error_log_lock_);
for (const ErrorLogMap::value_type& v : error_log_) {
(*new_errors)[v.first] = v.second;
}
// Reset all messages, but keep all already reported keys so that we do not report the
// same errors multiple times.
ClearErrorMap(error_log_);
}
Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
DCHECK_NE(message.error(), TErrorCode::OK);
// If either abort_on_error=true or the error necessitates execution stops
// immediately, return an error status.
if (abort_on_error()
|| message.error() == TErrorCode::CANCELLED
|| message.error() == TErrorCode::CANCELLED_INTERNALLY
|| message.error() == TErrorCode::MEM_LIMIT_EXCEEDED
|| message.error() == TErrorCode::INTERNAL_ERROR
|| message.error() == TErrorCode::DISK_IO_ERROR
|| message.error() == TErrorCode::THREAD_POOL_SUBMIT_FAILED
|| message.error() == TErrorCode::THREAD_POOL_TASK_TIMED_OUT) {
return Status(message);
}
// Otherwise, add the error to the error log and continue.
LogError(message);
return Status::OK();
}
void RuntimeState::Cancel() {
is_cancelled_.Store(true);
if (filter_bank_ != nullptr) filter_bank_->Cancel();
}
double RuntimeState::ComputeExchangeScanRatio() const {
int64_t bytes_read = 0;
for (const auto& c : bytes_read_counters_) bytes_read += c->value();
if (bytes_read == 0) return 0;
int64_t bytes_sent = 0;
for (const auto& c : bytes_sent_counters_) bytes_sent += c->value();
return (double)bytes_sent / bytes_read;
}
void RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
int64_t failed_allocation_size, const ErrorMsg* msg) {
// Constructing the MemLimitExceeded and logging it is not cheap, so
// avoid the cost if the query has already hit an error.
// This is particularly important on the UDF codepath, because the UDF codepath
// cannot abort the fragment immediately. It relies on callers checking status
// periodically. This means that this function could be called a large number of times
// (e.g. once per row) before the fragment aborts. See IMPALA-6997.
{
lock_guard<SpinLock> l(query_status_lock_);
if (!query_status_.ok()) return;
}
Status status = tracker->MemLimitExceeded(this, msg == nullptr ? "" : msg->msg(),
failed_allocation_size);
{
lock_guard<SpinLock> l(query_status_lock_);
if (query_status_.ok()) query_status_ = status;
}
LogError(status.msg());
// Add warning about missing stats except for compute stats child queries.
if (!query_ctx().__isset.parent_query_id &&
query_ctx().__isset.tables_missing_stats &&
!query_ctx().tables_missing_stats.empty()) {
LogError(ErrorMsg(TErrorCode::GENERAL,
GetTablesMissingStatsWarning(query_ctx().tables_missing_stats)));
}
}
Status RuntimeState::CheckQueryState() {
DCHECK(instance_mem_tracker_ != nullptr);
if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
SetMemLimitExceeded(instance_mem_tracker_);
}
return GetQueryStatus();
}
void RuntimeState::ReleaseResources() {
DCHECK(!released_resources_);
if (filter_bank_ != nullptr) filter_bank_->Close();
if (resource_pool_ != nullptr) {
ExecEnv::GetInstance()->thread_mgr()->DestroyPool(move(resource_pool_));
}
// Release any memory associated with codegen.
if (codegen_ != nullptr) codegen_->Close();
// Release the reservation, which should be unused at the point.
if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close();
// No more memory should be tracked for this instance at this point.
if (instance_mem_tracker_->consumption() != 0) {
LOG(WARNING) << "Query " << PrintId(query_id()) << " may have leaked memory." << endl
<< instance_mem_tracker_->LogUsage(MemTracker::UNLIMITED_DEPTH);
}
instance_mem_tracker_->Close();
if (local_query_state_.get() != nullptr) {
local_query_state_->ReleaseBackendResourceRefcount();
}
released_resources_ = true;
}
void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code) {
boost::lock_guard<SpinLock> l(aux_error_info_lock_);
if (aux_error_info_ == nullptr && !reported_aux_error_info_) {
aux_error_info_.reset(new AuxErrorInfoPB());
RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node();
network_addr->set_hostname(dest_node.hostname);
network_addr->set_port(dest_node.port);
rpc_error_info->set_posix_error_code(posix_error_code);
}
}
void RuntimeState::GetUnreportedAuxErrorInfo(AuxErrorInfoPB* aux_error_info) {
boost::lock_guard<SpinLock> l(aux_error_info_lock_);
if (aux_error_info_ != nullptr) {
aux_error_info->CopyFrom(*aux_error_info_);
}
aux_error_info_ = nullptr;
reported_aux_error_info_ = true;
}
const std::string& RuntimeState::GetEffectiveUser() const {
return impala::GetEffectiveUser(query_ctx().session);
}
ObjectPool* RuntimeState::obj_pool() const {
DCHECK(query_state_ != nullptr);
return query_state_->obj_pool();
}
const TQueryCtx& RuntimeState::query_ctx() const {
DCHECK(query_state_ != nullptr);
return query_state_->query_ctx();
}
const DescriptorTbl& RuntimeState::desc_tbl() const {
DCHECK(query_state_ != nullptr);
return query_state_->desc_tbl();
}
const TQueryOptions& RuntimeState::query_options() const {
return query_ctx().client_request.query_options;
}
MemTracker* RuntimeState::query_mem_tracker() {
DCHECK(query_state_ != nullptr);
return query_state_->query_mem_tracker();
}
}