blob: 5be4f1c6eee6f16c2545a5face751a54d4f6a738 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/runtime-state.h"
#include <iostream>
#include <jni.h>
#include <sstream>
#include <string>
#include "common/logging.h"
#include <boost/algorithm/string/join.hpp>
#include <gutil/strings/substitute.h>
#include "codegen/llvm-codegen.h"
#include "common/object-pool.h"
#include "common/status.h"
#include "exprs/expr.h"
#include "exprs/scalar-fn-call.h"
#include "runtime/buffered-block-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/descriptors.h"
#include "runtime/data-stream-mgr.h"
#include "runtime/data-stream-recvr.h"
#include "runtime/mem-tracker.h"
#include "runtime/runtime-filter-bank.h"
#include "runtime/timestamp-value.h"
#include "runtime/query-state.h"
#include "util/bitmap.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/error-util.h"
#include "util/jni-util.h"
#include "util/mem-info.h"
#include "util/pretty-printer.h"
#include "util/auth-util.h" // for GetEffectiveUser()
#include "common/names.h"
using namespace llvm;
DECLARE_int32(max_errors);
// The fraction of the query mem limit that is used for the block mgr. Operators
// that accumulate memory all use the block mgr so the majority of the memory should
// be allocated to the block mgr. The remaining memory is used by the non-spilling
// operators and should be independent of data size.
static const float BLOCK_MGR_MEM_FRACTION = 0.8f;
// The minimum amount of memory that must be left after the block mgr reserves the
// BLOCK_MGR_MEM_FRACTION. The block limit is:
// min(query_limit * BLOCK_MGR_MEM_FRACTION, query_limit - BLOCK_MGR_MEM_MIN_REMAINING)
// TODO: this value was picked arbitrarily and the tests are written to rely on this
// for the minimum memory required to run the query. Revisit.
static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024;
namespace impala {
RuntimeState::RuntimeState(
QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env)
: desc_tbl_(nullptr),
obj_pool_(new ObjectPool()),
query_state_(query_state),
fragment_ctx_(&fragment_ctx),
instance_ctx_(&instance_ctx),
now_(new TimestampValue(
query_state->query_ctx().now_string.c_str(),
query_state->query_ctx().now_string.size())),
exec_env_(exec_env),
profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
is_cancelled_(false),
root_node_id_(-1) {
Init();
}
RuntimeState::RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env)
: obj_pool_(new ObjectPool()),
query_state_(nullptr),
fragment_ctx_(nullptr),
instance_ctx_(nullptr),
local_query_ctx_(query_ctx),
now_(new TimestampValue(query_ctx.now_string.c_str(), query_ctx.now_string.size())),
exec_env_(exec_env == nullptr ? ExecEnv::GetInstance() : exec_env),
profile_(obj_pool_.get(), "<unnamed>"),
is_cancelled_(false),
root_node_id_(-1) {
Init();
}
RuntimeState::~RuntimeState() {
block_mgr_.reset();
// Release codegen memory before tearing down trackers.
codegen_.reset();
// query_mem_tracker_ must be valid as long as instance_mem_tracker_ is so
// delete instance_mem_tracker_ first.
// LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded.
// Break the link between the instance_mem_tracker and its parent (query_mem_tracker_)
// before the instance_mem_tracker_ and its children are destroyed.
// May be NULL if InitMemTrackers() is not called, for example from tests.
if (instance_mem_tracker_ != NULL) instance_mem_tracker_->UnregisterFromParent();
instance_mem_tracker_.reset();
query_mem_tracker_.reset();
}
void RuntimeState::Init() {
SCOPED_TIMER(profile_.total_time_counter());
// Register with the thread mgr
if (exec_env_ != NULL) {
resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
DCHECK(resource_pool_ != NULL);
}
total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "TotalThreads");
total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime");
total_network_send_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkSendTime");
total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime");
}
void RuntimeState::InitMemTrackers(const string* pool_name, int64_t query_bytes_limit) {
MemTracker* query_parent_tracker = exec_env_->process_mem_tracker();
if (pool_name != NULL) {
query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name,
query_parent_tracker);
}
query_mem_tracker_ =
MemTracker::GetQueryMemTracker(query_id(), query_bytes_limit, query_parent_tracker);
instance_mem_tracker_.reset(new MemTracker(
runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_.get()));
}
void RuntimeState::InitFilterBank() {
filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this));
}
Status RuntimeState::CreateBlockMgr() {
DCHECK(block_mgr_.get() == NULL);
// Compute the max memory the block mgr will use.
int64_t block_mgr_limit = query_mem_tracker_->lowest_limit();
if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max();
block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION),
block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING);
if (block_mgr_limit < 0) block_mgr_limit = 0;
if (query_options().__isset.max_block_mgr_memory &&
query_options().max_block_mgr_memory > 0) {
block_mgr_limit = query_options().max_block_mgr_memory;
LOG(WARNING) << "Block mgr mem limit: "
<< PrettyPrinter::Print(block_mgr_limit, TUnit::BYTES);
}
RETURN_IF_ERROR(BufferedBlockMgr::Create(this, query_mem_tracker(),
runtime_profile(), exec_env()->tmp_file_mgr(), block_mgr_limit,
io_mgr()->max_read_buffer_size(), &block_mgr_));
return Status::OK();
}
Status RuntimeState::CreateCodegen() {
if (codegen_.get() != NULL) return Status::OK();
// TODO: add the fragment ID to the codegen ID as well
RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(obj_pool_.get(),
instance_mem_tracker_.get(), PrintId(fragment_instance_id()), &codegen_));
codegen_->EnableOptimizations(true);
profile_.AddChild(codegen_->runtime_profile());
return Status::OK();
}
Status RuntimeState::CodegenScalarFns() {
for (ScalarFnCall* scalar_fn : scalar_fns_to_codegen_) {
Function* fn;
RETURN_IF_ERROR(scalar_fn->GetCodegendComputeFn(codegen_.get(), &fn));
}
return Status::OK();
}
string RuntimeState::ErrorLog() {
lock_guard<SpinLock> l(error_log_lock_);
return PrintErrorMapToString(error_log_);
}
void RuntimeState::GetErrors(ErrorLogMap* errors) {
lock_guard<SpinLock> l(error_log_lock_);
*errors = error_log_;
}
bool RuntimeState::LogError(const ErrorMsg& message, int vlog_level) {
lock_guard<SpinLock> l(error_log_lock_);
// All errors go to the log, unreported_error_count_ is counted independently of the
// size of the error_log to account for errors that were already reported to the
// coordinator
VLOG(vlog_level) << "Error from query " << query_id() << ": " << message.msg();
if (ErrorCount(error_log_) < query_options().max_errors) {
AppendError(&error_log_, message);
return true;
}
return false;
}
void RuntimeState::GetUnreportedErrors(ErrorLogMap* new_errors) {
lock_guard<SpinLock> l(error_log_lock_);
*new_errors = error_log_;
// Reset all messages, but keep all already reported keys so that we do not report the
// same errors multiple times.
ClearErrorMap(error_log_);
}
Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
DCHECK_NE(message.error(), TErrorCode::OK);
// If either abort_on_error=true or the error necessitates execution stops
// immediately, return an error status.
if (abort_on_error() ||
message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
message.error() == TErrorCode::CANCELLED) {
return Status(message);
}
// Otherwise, add the error to the error log and continue.
LogError(message);
return Status::OK();
}
void RuntimeState::LogMemLimitExceeded(const MemTracker* tracker,
int64_t failed_allocation_size) {
DCHECK_GE(failed_allocation_size, 0);
DCHECK(query_mem_tracker_.get() != NULL);
stringstream ss;
ss << "Memory Limit Exceeded by fragment: " << fragment_instance_id() << endl;
if (failed_allocation_size != 0) {
DCHECK(tracker != NULL);
ss << " " << tracker->label() << " could not allocate "
<< PrettyPrinter::Print(failed_allocation_size, TUnit::BYTES)
<< " without exceeding limit." << endl;
}
if (exec_env_->process_mem_tracker()->LimitExceeded()) {
ss << exec_env_->process_mem_tracker()->LogUsage();
} else {
ss << query_mem_tracker_->LogUsage();
}
LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
}
Status RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
int64_t failed_allocation_size, const ErrorMsg* msg) {
{
lock_guard<SpinLock> l(query_status_lock_);
if (query_status_.ok()) {
query_status_ = Status::MemLimitExceeded();
if (msg != NULL) query_status_.MergeStatus(*msg);
} else {
return query_status_;
}
}
LogMemLimitExceeded(tracker, failed_allocation_size);
// Add warning about missing stats except for compute stats child queries.
if (!query_ctx().__isset.parent_query_id &&
query_ctx().__isset.tables_missing_stats &&
!query_ctx().tables_missing_stats.empty()) {
LogError(ErrorMsg(TErrorCode::GENERAL,
GetTablesMissingStatsWarning(query_ctx().tables_missing_stats)));
}
DCHECK(query_status_.IsMemLimitExceeded());
return query_status_;
}
Status RuntimeState::CheckQueryState() {
if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded())) return SetMemLimitExceeded();
return GetQueryStatus();
}
void RuntimeState::AcquireReaderContext(DiskIoRequestContext* reader_context) {
boost::lock_guard<SpinLock> l(reader_contexts_lock_);
reader_contexts_.push_back(reader_context);
}
void RuntimeState::UnregisterReaderContexts() {
boost::lock_guard<SpinLock> l(reader_contexts_lock_);
for (DiskIoRequestContext* context : reader_contexts_) {
io_mgr()->UnregisterContext(context);
}
reader_contexts_.clear();
}
void RuntimeState::ReleaseResources() {
UnregisterReaderContexts();
if (desc_tbl_ != nullptr) desc_tbl_->ClosePartitionExprs(this);
if (filter_bank_ != nullptr) filter_bank_->Close();
if (resource_pool_ != nullptr) {
exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
}
}
const std::string& RuntimeState::GetEffectiveUser() const {
return impala::GetEffectiveUser(query_ctx().session);
}
ImpalaBackendClientCache* RuntimeState::impalad_client_cache() {
return exec_env_->impalad_client_cache();
}
CatalogServiceClientCache* RuntimeState::catalogd_client_cache() {
return exec_env_->catalogd_client_cache();
}
DiskIoMgr* RuntimeState::io_mgr() {
return exec_env_->disk_io_mgr();
}
DataStreamMgr* RuntimeState::stream_mgr() {
return exec_env_->stream_mgr();
}
HBaseTableFactory* RuntimeState::htable_factory() {
return exec_env_->htable_factory();
}
const TQueryCtx& RuntimeState::query_ctx() const {
return query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_;
}
const TQueryOptions& RuntimeState::query_options() const {
const TQueryCtx& query_ctx =
query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_;
return query_ctx.client_request.query_options;
}
}