blob: e1c20cb3bba760283d7e36c7569a7803d760dd66 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/test-env.h"
#include <limits>
#include <memory>
#include "gutil/strings/substitute.h"
#include "rpc/rpc-mgr.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/query-state.h"
#include "runtime/tmp-file-mgr.h"
#include "service/control-service.h"
#include "util/disk-info.h"
#include "util/impalad-metrics.h"
#include "util/memory-metrics.h"
#include "common/names.h"
using boost::scoped_ptr;
using std::numeric_limits;
DECLARE_string(hostname);
namespace impala {
scoped_ptr<MetricGroup> TestEnv::static_metrics_;
TestEnv::TestEnv()
: have_tmp_file_mgr_args_(false),
buffer_pool_min_buffer_len_(64 * 1024),
buffer_pool_capacity_(0) {}
Status TestEnv::Init() {
if (static_metrics_ == NULL) {
static_metrics_.reset(new MetricGroup("test-env-static-metrics"));
ImpaladMetrics::CreateMetrics(static_metrics_.get());
RETURN_IF_ERROR(RegisterMemoryMetrics(static_metrics_.get(), true, nullptr, nullptr));
ImpaladMetrics::CreateMetrics(
static_metrics_->GetOrCreateChildGroup("impala-server"));
}
exec_env_.reset(new ExecEnv);
// Populate the ExecEnv state that the backend tests need.
RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init());
exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
if (have_tmp_file_mgr_args_) {
RETURN_IF_ERROR(
tmp_file_mgr()->InitCustom(tmp_dirs_, one_tmp_dir_per_device_, metrics()));
} else {
RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
}
if (enable_buffer_pool_) {
exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_,
static_cast<int64_t>(0.1 * buffer_pool_capacity_));
}
if (process_mem_tracker_use_metrics_) {
exec_env_->InitMemTracker(process_mem_limit_);
} else {
exec_env_->mem_tracker_.reset(new MemTracker(process_mem_limit_, "Process"));
}
// Initialize RpcMgr and control service.
IpAddr ip_address;
RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address));
exec_env_->krpc_address_.__set_hostname(ip_address);
RETURN_IF_ERROR(exec_env_->rpc_mgr_->Init(exec_env_->krpc_address_));
exec_env_->control_svc_.reset(new ControlService(exec_env_->rpc_metrics_));
RETURN_IF_ERROR(exec_env_->control_svc_->Init());
return Status::OK();
}
void TestEnv::SetTmpFileMgrArgs(
const std::vector<std::string>& tmp_dirs, bool one_dir_per_device) {
have_tmp_file_mgr_args_ = true;
tmp_dirs_ = tmp_dirs;
one_tmp_dir_per_device_ = one_dir_per_device;
}
void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) {
buffer_pool_min_buffer_len_ = min_buffer_len;
buffer_pool_capacity_ = capacity;
}
void TestEnv::SetProcessMemTrackerArgs(int64_t bytes_limit, bool use_metrics) {
process_mem_limit_ = bytes_limit;
process_mem_tracker_use_metrics_ = use_metrics;
}
TestEnv::~TestEnv() {
// Queries must be torn down first since they are dependent on global state.
TearDownQueries();
// tear down exec env state to avoid leaks
exec_env_.reset();
}
void TestEnv::TearDownQueries() {
for (RuntimeState* runtime_state : runtime_states_) runtime_state->ReleaseResources();
runtime_states_.clear();
for (QueryState* query_state : query_states_) {
query_state->ReleaseBackendResourceRefcount(); // Acquired by QueryState::Init()
exec_env_->query_exec_mgr()->ReleaseQueryState(query_state);
}
query_states_.clear();
}
int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) {
DCHECK_GE(max_buffers, -1);
if (max_buffers == -1) return -1;
return max_buffers * static_cast<int64_t>(block_size);
}
int64_t TestEnv::TotalQueryMemoryConsumption() {
int64_t total = 0;
for (QueryState* query_state : query_states_) {
total += query_state->query_mem_tracker()->consumption();
}
return total;
}
Status TestEnv::CreateQueryState(
int64_t query_id, const TQueryOptions* query_options, RuntimeState** runtime_state) {
TQueryCtx query_ctx;
if (query_options != nullptr) query_ctx.client_request.query_options = *query_options;
query_ctx.query_id.hi = 0;
query_ctx.query_id.lo = query_id;
query_ctx.request_pool = "test-pool";
query_ctx.coord_address = exec_env_->configured_backend_address_;
query_ctx.coord_krpc_address = exec_env_->krpc_address_;
TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options;
int64_t mem_limit =
query_options_to_use->__isset.mem_limit && query_options_to_use->mem_limit > 0 ?
query_options_to_use->mem_limit :
-1;
// CreateQueryState() enforces the invariant that 'query_id' must be unique.
QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, mem_limit);
query_states_.push_back(qs);
// make sure to initialize data structures unrelated to the TExecQueryFInstancesParams
// param
ExecQueryFInstancesRequestPB rpc_params;
// create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
rpc_params.set_coord_state_idx(0);
TExecPlanFragmentInfo fragment_info;
fragment_info.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
fragment_info.__set_fragment_instance_ctxs(
vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info));
FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(qs,
qs->fragment_info_.fragment_ctxs[0], qs->fragment_info_.fragment_instance_ctxs[0]));
RuntimeState* rs = qs->obj_pool()->Add(
new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
runtime_states_.push_back(rs);
*runtime_state = rs;
return Status::OK();
}
}