blob: abfe0851800d8c36815feeaf5bcdead80e19c493 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scheduling/query-resource-mgr.h"
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <gutil/strings/substitute.h>
#include <sstream>
#include "runtime/exec-env.h"
#include "resourcebroker/resource-broker.h"
#include "util/bit-util.h"
#include "util/cgroups-mgr.h"
#include "util/container-util.h"
#include "util/network-util.h"
#include "util/promise.h"
#include "util/time.h"
#include "common/names.h"
using boost::uuids::random_generator;
using boost::uuids::uuid;
using namespace impala;
using namespace strings;
DEFINE_int64(rm_mem_expansion_timeout_ms, 5000, "The amount of time to wait (ms) "
"for a memory expansion request.");
DEFINE_double(max_vcore_oversubscription_ratio, 2.5, "(Advanced) The maximum ratio "
"allowed between running threads and acquired VCore resources for a query's fragments"
" on a single node");
ResourceResolver::ResourceResolver(const unordered_set<TNetworkAddress>& unique_hosts) {
if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
CreateLocalLlamaNodeMapping(unique_hosts);
}
}
void ResourceResolver::GetResourceHostport(const TNetworkAddress& src,
TNetworkAddress* dest) {
if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
*dest = impalad_to_dn_[src];
} else {
dest->hostname = src.hostname;
dest->port = 0;
}
}
void ResourceResolver::CreateLocalLlamaNodeMapping(
const unordered_set<TNetworkAddress>& unique_hosts) {
DCHECK(ExecEnv::GetInstance()->is_pseudo_distributed_llama());
const vector<string>& llama_nodes =
ExecEnv::GetInstance()->resource_broker()->llama_nodes();
DCHECK(!llama_nodes.empty());
int llama_node_ix = 0;
for (const TNetworkAddress& host: unique_hosts) {
TNetworkAddress dn_hostport = MakeNetworkAddress(llama_nodes[llama_node_ix]);
impalad_to_dn_[host] = dn_hostport;
dn_to_impalad_[dn_hostport] = host;
LOG(INFO) << "Mapping Datanode " << dn_hostport << " to Impalad: " << host;
// Round robin the registered Llama nodes.
llama_node_ix = (llama_node_ix + 1) % llama_nodes.size();
}
}
QueryResourceMgr::QueryResourceMgr(const TUniqueId& reservation_id,
const TNetworkAddress& local_resource_location, const TUniqueId& query_id)
: reservation_id_(reservation_id), query_id_(query_id),
local_resource_location_(local_resource_location), exit_(false), callback_count_(0),
threads_running_(0), vcores_(0) {
max_vcore_oversubscription_ratio_ = FLAGS_max_vcore_oversubscription_ratio;
}
void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) {
LOG(INFO) << "Initialising vcore acquisition thread for query " << PrintId(query_id_)
<< " (" << init_vcores << " initial vcores)";
DCHECK(acquire_vcore_thread_.get() == NULL)
<< "Double initialisation of QueryResourceMgr::InitCpuAcquisition()";
vcores_ = init_vcores;
// These shared pointers to atomic values are used to communicate between the vcore
// acquisition thread and the class destructor. If the acquisition thread is in the
// middle of an Expand() call, the destructor might have to wait 5s (the default
// timeout) to return. This holds up query close operations. So instead check to see if
// the thread is in Expand(), and if so we set a synchronised flag early_exit_ which it
// inspects immediately after exiting Expand(), and if true, exits before touching any
// of the class-wide state (because the destructor may have finished before this point).
thread_in_expand_.reset(new AtomicInt32());
early_exit_.reset(new AtomicInt32());
acquire_vcore_thread_.reset(
new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)),
bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this,
thread_in_expand_, early_exit_)));
}
llama::TResource QueryResourceMgr::CreateResource(int64_t memory_mb, int64_t vcores) {
DCHECK(memory_mb > 0 || vcores > 0);
DCHECK(reservation_id_ != TUniqueId()) << "Expansion requires existing reservation";
unordered_set<TNetworkAddress> hosts;
hosts.insert(local_resource_location_);
ResourceResolver resolver(hosts);
llama::TResource res;
res.memory_mb = memory_mb;
res.v_cpu_cores = vcores;
TNetworkAddress res_address;
resolver.GetResourceHostport(local_resource_location_, &res_address);
res.__set_askedLocation(TNetworkAddressToString(res_address));
random_generator uuid_generator;
uuid id = uuid_generator();
res.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]);
res.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]);
res.enforcement = llama::TLocationEnforcement::MUST;
return res;
}
bool QueryResourceMgr::AboveVcoreSubscriptionThreshold() {
return threads_running_ > vcores_ * (max_vcore_oversubscription_ratio_ * 0.8);
}
void QueryResourceMgr::NotifyThreadUsageChange(int delta) {
lock_guard<mutex> l(threads_running_lock_);
threads_running_ += delta;
DCHECK(threads_running_ >= 0L);
if (AboveVcoreSubscriptionThreshold()) threads_changed_cv_.notify_all();
}
int32_t QueryResourceMgr::AddVcoreAvailableCb(const VcoreAvailableCb& callback) {
lock_guard<mutex> l(callbacks_lock_);
callbacks_[callback_count_] = callback;
callbacks_it_ = callbacks_.begin();
return callback_count_++;
}
void QueryResourceMgr::RemoveVcoreAvailableCb(int32_t callback_id) {
lock_guard<mutex> l(callbacks_lock_);
CallbackMap::iterator it = callbacks_.find(callback_id);
DCHECK(it != callbacks_.end()) << "Could not find callback with id: " << callback_id;
callbacks_.erase(it);
callbacks_it_ = callbacks_.begin();
}
Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes,
int64_t* allocated_bytes) {
DCHECK(allocated_bytes != NULL);
*allocated_bytes = 0;
int64_t requested_mb = BitUtil::Ceil(requested_bytes, 1024L * 1024L);
llama::TResource res = CreateResource(max<int64_t>(1, requested_mb), 0);
llama::TUniqueId expansion_id;
llama::TAllocatedResource resource;
RETURN_IF_ERROR(ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id_,
res, FLAGS_rm_mem_expansion_timeout_ms, &expansion_id, &resource));
DCHECK_EQ(resource.v_cpu_cores, 0L) << "Unexpected VCPUs returned by Llama";
*allocated_bytes = resource.memory_mb * 1024L * 1024L;
return Status::OK();
}
void QueryResourceMgr::AcquireVcoreResources(
shared_ptr<AtomicInt32> thread_in_expand,
shared_ptr<AtomicInt32> early_exit) {
// Take a copy because we'd like to print it in some cases after the destructor.
TUniqueId reservation_id = reservation_id_;
VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id;
while (!ShouldExit()) {
{
unique_lock<mutex> l(threads_running_lock_);
while (!AboveVcoreSubscriptionThreshold() && !ShouldExit()) {
threads_changed_cv_.wait(l);
}
}
if (ShouldExit()) break;
llama::TResource res = CreateResource(0L, 1);
VLOG_QUERY << "Expanding VCore allocation: " << reservation_id_;
// First signal that we are about to enter a blocking Expand() call.
thread_in_expand->Add(1L);
// TODO: Could cause problems if called during or after a system-wide shutdown
llama::TAllocatedResource resource;
llama::TUniqueId expansion_id;
Status status = ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id,
res, -1, &expansion_id, &resource);
thread_in_expand->Add(-1L);
// If signalled to exit quickly by the destructor, exit the loop now. It's important
// to do so without accessing any class variables since they may no longer be valid.
// Need to check after setting thread_in_expand to avoid a race.
if (early_exit->Add(0L) != 0) {
VLOG_QUERY << "Fragment finished during Expand(): " << reservation_id;
break;
}
if (!status.ok()) {
VLOG_QUERY << "Could not expand CPU resources for query " << PrintId(query_id_)
<< ", reservation: " << PrintId(reservation_id_) << ". Error was: "
<< status.GetDetail();
// Sleep to avoid flooding the resource broker, particularly if requests are being
// rejected quickly (and therefore we stay oversubscribed)
// TODO: configurable timeout
SleepForMs(250);
continue;
}
DCHECK(resource.v_cpu_cores == 1)
<< "Asked for 1 core, got: " << resource.v_cpu_cores;
vcores_ += resource.v_cpu_cores;
ExecEnv* exec_env = ExecEnv::GetInstance();
const string& cgroup =
exec_env->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_"));
int32_t num_shares = exec_env->cgroups_mgr()->VirtualCoresToCpuShares(vcores_);
exec_env->cgroups_mgr()->SetCpuShares(cgroup, num_shares);
// TODO: Only call one callback no matter how many VCores we just added; maybe call
// all of them?
{
lock_guard<mutex> l(callbacks_lock_);
if (callbacks_.size() != 0) {
callbacks_it_->second();
if (++callbacks_it_ == callbacks_.end()) callbacks_it_ = callbacks_.begin();
}
}
}
VLOG_QUERY << "Leaving VCore acquisition thread: " << reservation_id;
}
bool QueryResourceMgr::ShouldExit() {
lock_guard<mutex> l(exit_lock_);
return exit_;
}
void QueryResourceMgr::Shutdown() {
{
lock_guard<mutex> l(exit_lock_);
if (exit_) return;
exit_ = true;
}
{
lock_guard<mutex> l(callbacks_lock_);
callbacks_.clear();
}
threads_changed_cv_.notify_all();
}
QueryResourceMgr::~QueryResourceMgr() {
if (acquire_vcore_thread_.get() == NULL) return;
if (!ShouldExit()) Shutdown();
// First, set the early exit flag. Then check to see if the thread is in Expand(). If
// so, the acquisition thread is guaranteed to see early_exit_ == 1L once it finishes
// Expand(), and will exit immediately. It's therefore safe not to wait for it.
early_exit_->Add(1L);
if (thread_in_expand_->Add(0L) == 0L) {
acquire_vcore_thread_->Join();
}
}