blob: 8675ae9bba4671b4e51a0ca52e78c4065d3efdc6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/test-env.h"
#include "util/disk-info.h"
#include "util/impalad-metrics.h"
#include "gutil/strings/substitute.h"
#include <memory>
#include "common/names.h"
using boost::scoped_ptr;
namespace impala {
scoped_ptr<MetricGroup> TestEnv::static_metrics_;
TestEnv::TestEnv() {
if (static_metrics_ == NULL) {
static_metrics_.reset(new MetricGroup("test-env-static-metrics"));
ImpaladMetrics::CreateMetrics(static_metrics_.get());
}
exec_env_.reset(new ExecEnv);
exec_env_->InitForFeTests();
io_mgr_tracker_.reset(new MemTracker(-1));
exec_env_->disk_io_mgr()->Init(io_mgr_tracker_.get());
InitMetrics();
tmp_file_mgr_.reset(new TmpFileMgr);
tmp_file_mgr_->Init(metrics_.get());
}
void TestEnv::InitMetrics() {
metrics_.reset(new MetricGroup("test-env-metrics"));
}
void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool one_dir_per_device) {
// Need to recreate metrics to avoid error when registering metric twice.
InitMetrics();
tmp_file_mgr_.reset(new TmpFileMgr);
tmp_file_mgr_->InitCustom(tmp_dirs, one_dir_per_device, metrics_.get());
}
TestEnv::~TestEnv() {
// Queries must be torn down first since they are dependent on global state.
TearDownRuntimeStates();
exec_env_.reset();
io_mgr_tracker_.reset();
tmp_file_mgr_.reset();
metrics_.reset();
}
void TestEnv::TearDownRuntimeStates() {
for (auto& runtime_state : runtime_states_) runtime_state.second->ReleaseResources();
runtime_states_.clear();
}
int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) {
DCHECK_GE(max_buffers, -1);
if (max_buffers == -1) return -1;
return max_buffers * static_cast<int64_t>(block_size);
}
int64_t TestEnv::TotalQueryMemoryConsumption() {
int64_t total = 0;
for (const auto& runtime_state : runtime_states_) {
total += runtime_state.second->query_mem_tracker()->consumption();
}
return total;
}
Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size,
const TQueryOptions* query_options, RuntimeState** runtime_state) {
// Enforce invariant that each query ID can be registered at most once.
if (runtime_states_.find(query_id) != runtime_states_.end()) {
return Status(Substitute("Duplicate query id found: $0", query_id));
}
TQueryCtx query_ctx;
if (query_options != nullptr) query_ctx.client_request.query_options = *query_options;
query_ctx.query_id.hi = 0;
query_ctx.query_id.lo = query_id;
*runtime_state = new RuntimeState(query_ctx, exec_env_.get());
(*runtime_state)->InitMemTrackers(nullptr, -1);
shared_ptr<BufferedBlockMgr> mgr;
RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
(*runtime_state)->query_mem_tracker(), (*runtime_state)->runtime_profile(),
tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
(*runtime_state)->set_block_mgr(mgr);
runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state);
return Status::OK();
}
}