blob: 205c6f67b7b86611602e03dd013e0a2ad007ad0b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/bufferpool/reservation-tracker.h"
#include <algorithm>
#include <cstdlib>
#include "common/object-pool.h"
#include "gutil/atomicops.h"
#include "gutil/strings/substitute.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "util/dummy-runtime-profile.h"
#include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
namespace impala {
ReservationTracker::ReservationTracker() {}
ReservationTracker::~ReservationTracker() {
DCHECK(!initialized_);
}
void ReservationTracker::InitRootTracker(
RuntimeProfile* profile, int64_t reservation_limit) {
lock_guard<SpinLock> l(lock_);
DCHECK(!initialized_);
parent_ = nullptr;
mem_tracker_ = nullptr;
reservation_limit_ = reservation_limit;
reservation_ = 0;
used_reservation_ = 0;
child_reservations_ = 0;
initialized_ = true;
InitCounters(profile, reservation_limit_);
COUNTER_SET(counters_.peak_reservation, reservation_);
CheckConsistency();
}
void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit,
MemLimit mem_limit_mode) {
DCHECK(parent != nullptr);
DCHECK_GE(reservation_limit, 0);
lock_guard<SpinLock> l(lock_);
DCHECK(!initialized_);
parent_ = parent;
mem_tracker_ = mem_tracker;
mem_limit_mode_ = mem_limit_mode;
reservation_limit_ = reservation_limit;
reservation_ = 0;
used_reservation_ = 0;
child_reservations_ = 0;
initialized_ = true;
if (mem_tracker_ != nullptr) {
MemTracker* parent_mem_tracker = GetParentMemTracker();
if (parent_mem_tracker != nullptr) {
// Make sure the parent links of the MemTrackers correspond to our parent links.
DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
// Make sure we don't have a lower limit than the ancestor, since we don't enforce
// limits at lower links.
DCHECK_EQ(mem_tracker_->GetLowestLimit(mem_limit_mode_),
parent_mem_tracker->GetLowestLimit(mem_limit_mode_));
} else {
// Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
// shouldn't have a MemTracker.
ReservationTracker* ancestor = parent_;
while (ancestor != nullptr) {
DCHECK(ancestor->mem_tracker_ == nullptr);
ancestor = ancestor->parent_;
}
}
}
InitCounters(profile, reservation_limit_);
CheckConsistency();
}
void ReservationTracker::InitCounters(
RuntimeProfile* profile, int64_t reservation_limit) {
if (profile == nullptr) {
dummy_profile_.reset(new DummyProfile);
profile = dummy_profile_->profile();
}
// Check that another tracker's counters aren't already registered in the profile.
DCHECK(profile->GetCounter("PeakReservation") == nullptr);
counters_.peak_reservation =
profile->AddHighWaterMarkCounter("PeakReservation", TUnit::BYTES);
counters_.peak_used_reservation =
profile->AddHighWaterMarkCounter("PeakUsedReservation", TUnit::BYTES);
// Only show the limit if set.
counters_.reservation_limit = nullptr;
if (reservation_limit != numeric_limits<int64_t>::max()) {
counters_.reservation_limit = ADD_COUNTER(profile, "ReservationLimit", TUnit::BYTES);
COUNTER_SET(counters_.reservation_limit, reservation_limit);
}
if (mem_tracker_ != nullptr) mem_tracker_->EnableReservationReporting(counters_);
}
void ReservationTracker::Close() {
lock_guard<SpinLock> l(lock_);
if (!initialized_) return;
CheckConsistency();
DCHECK_EQ(used_reservation_, 0);
DCHECK_EQ(child_reservations_, 0);
// Release any reservation to parent.
if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false);
mem_tracker_ = nullptr;
parent_ = nullptr;
initialized_ = false;
}
bool ReservationTracker::IncreaseReservation(int64_t bytes, Status* error_status) {
lock_guard<SpinLock> l(lock_);
return IncreaseReservationInternalLocked(bytes, false, false, error_status);
}
bool ReservationTracker::IncreaseReservationToFit(int64_t bytes, Status* error_status) {
lock_guard<SpinLock> l(lock_);
return IncreaseReservationInternalLocked(bytes, true, false, error_status);
}
bool ReservationTracker::IncreaseReservationToFitAndAllocate(
int64_t bytes, Status* error_status) {
lock_guard<SpinLock> l(lock_);
if (!IncreaseReservationInternalLocked(bytes, true, false, error_status)) return false;
AllocateFromLocked(bytes);
return true;
}
bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
bool use_existing_reservation, bool is_child_reservation, Status* error_status) {
DCHECK(initialized_);
int64_t reservation_increase =
use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) : bytes;
DCHECK_GE(reservation_increase, 0);
bool granted;
// Check if the increase is allowed, starting at the bottom of hierarchy.
if (reservation_increase == 0) {
granted = true;
} else if (increase_deny_probability_ != 0.0
&& rand() < increase_deny_probability_ * (RAND_MAX + 1L)) {
// Randomly deny reservation if requested. Use rand() to avoid needing to set up a RNG.
// Should be good enough. If the probability is 0.0, this never triggers. If it is 1.0
// it always triggers.
granted = false;
if (error_status != nullptr) {
*error_status = Status::Expected(
Substitute("Debug random failure mode is turned on: Reservation of $0 denied.",
PrettyPrinter::Print(bytes, TUnit::BYTES)));
}
} else if (reservation_ + reservation_increase > reservation_limit_) {
granted = false;
if (error_status != nullptr) {
MemTracker* mem_tracker = mem_tracker_;
if (mem_tracker == nullptr) {
// The ReservationTracker at the root does not have a reference to the top
// level(process) MemTracker.
mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
}
string error_msg = Substitute(
"Failed to increase reservation by $0 because it would exceed the applicable "
"reservation limit for the \"$1\" ReservationTracker: reservation_limit=$2 "
"reservation=$3 used_reservation=$4 child_reservations=$5",
PrettyPrinter::Print(bytes, TUnit::BYTES),
mem_tracker == nullptr ? "Process" : mem_tracker->label(),
PrettyPrinter::Print(reservation_limit_, TUnit::BYTES),
PrettyPrinter::Print(reservation_, TUnit::BYTES),
PrettyPrinter::Print(used_reservation_, TUnit::BYTES),
PrettyPrinter::Print(child_reservations_, TUnit::BYTES));
string top_n_queries = mem_tracker->LogTopNQueries(5);
if (!top_n_queries.empty()) {
error_msg = Substitute(
"$0\nThe top 5 queries that allocated memory under this tracker are:\n$1",
error_msg, top_n_queries);
}
*error_status = Status::Expected(error_msg);
}
} else if (parent_ == nullptr) {
// No parent and no linked MemTracker - increase can be granted.
DCHECK(mem_tracker_ == nullptr) << "Root cannot have linked MemTracker";
granted = true;
} else {
{
lock_guard<SpinLock> l(parent_->lock_);
granted = parent_->IncreaseReservationInternalLocked(
reservation_increase, true, true, error_status);
}
if (granted
&& !TryConsumeFromMemTracker(reservation_increase, mem_limit_mode_)) {
granted = false;
if (error_status != nullptr) {
*error_status = mem_tracker_->MemLimitExceeded(nullptr,
"Could not allocate memory while trying to increase reservation.",
reservation_increase);
}
// Roll back changes to ancestors if MemTracker update fails.
parent_->DecreaseReservation(reservation_increase, true);
}
}
if (granted) {
// The reservation was granted and state updated in all ancestors: we can modify
// this tracker's state now.
UpdateReservation(reservation_increase);
if (is_child_reservation) child_reservations_ += bytes;
}
CheckConsistency();
return granted;
}
bool ReservationTracker::TryConsumeFromMemTracker(
int64_t reservation_increase, MemLimit mem_limit_mode) {
DCHECK_GE(reservation_increase, 0);
if (mem_tracker_ == nullptr) return true;
if (GetParentMemTracker() == nullptr) {
// At the topmost link, which may be a MemTracker with a limit, we need to use
// TryConsume() to check the limit.
return mem_tracker_->TryConsume(reservation_increase, mem_limit_mode);
} else {
// For lower links, there shouldn't be a limit to enforce, so we just need to
// update the consumption of the linked MemTracker since the reservation is
// already reflected in its parent.
mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker());
return true;
}
}
void ReservationTracker::ReleaseToMemTracker(int64_t reservation_decrease) {
DCHECK_GE(reservation_decrease, 0);
if (mem_tracker_ == nullptr) return;
if (GetParentMemTracker() == nullptr) {
mem_tracker_->Release(reservation_decrease);
} else {
mem_tracker_->ReleaseLocal(reservation_decrease, GetParentMemTracker());
}
}
void ReservationTracker::DecreaseReservation(int64_t bytes, bool is_child_reservation) {
lock_guard<SpinLock> l(lock_);
DecreaseReservationLocked(bytes, is_child_reservation);
}
void ReservationTracker::DecreaseReservationLocked(
int64_t bytes, bool is_child_reservation) {
DCHECK(initialized_);
DCHECK_GE(reservation_, bytes);
if (bytes == 0) return;
if (is_child_reservation) child_reservations_ -= bytes;
UpdateReservation(-bytes);
ReleaseToMemTracker(bytes);
// The reservation should be returned up the tree.
if (parent_ != nullptr) parent_->DecreaseReservation(bytes, true);
CheckConsistency();
}
bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_t bytes) {
if (other == this) return true;
// Find the path to the root from both. The root is guaranteed to be a common ancestor.
vector<ReservationTracker*> path_to_common = FindPathToRoot();
vector<ReservationTracker*> other_path_to_common = other->FindPathToRoot();
DCHECK_EQ(path_to_common.back(), other_path_to_common.back());
ReservationTracker* common_ancestor = path_to_common.back();
// Remove any common ancestors - they do not need to be updated for this transfer.
while (!path_to_common.empty() && !other_path_to_common.empty()
&& path_to_common.back() == other_path_to_common.back()) {
common_ancestor = path_to_common.back();
path_to_common.pop_back();
other_path_to_common.pop_back();
}
// At this point, we have three cases:
// 1. 'common_ancestor' == 'other'. 'other_path_to_common' is empty because 'other' is
// the lowest common ancestor. To transfer, we decrease the reservation on the
// trackers under 'other', down to 'this'.
// 2. 'common_ancestor' == 'this'. 'path_to_common' is empty because 'this' is the
// lowest common ancestor. To transfer, we increase the reservation on the trackers
// under 'this', down to 'other'.
// 3. Neither is an ancestor of the other. Both 'other_path_to_common' and
// 'path_to_common' are non-empty. We increase the reservation on trackers from
// 'other' up to one below the common ancestor (checking limits as needed) and if
// successful, decrease reservations on trackers from 'this' up to one below the
// common ancestor.
// Lock all of the trackers so we can do the update atomically. Need to be careful to
// lock subtrees in the correct order.
vector<unique_lock<SpinLock>> locks;
bool lock_first = path_to_common.empty() || other_path_to_common.empty()
|| lock_sibling_subtree_first(path_to_common.back(), other_path_to_common.back());
if (lock_first) {
for (ReservationTracker* tracker : path_to_common) locks.emplace_back(tracker->lock_);
}
for (ReservationTracker* tracker : other_path_to_common) {
locks.emplace_back(tracker->lock_);
}
if (!lock_first) {
for (ReservationTracker* tracker : path_to_common) locks.emplace_back(tracker->lock_);
}
// Check reservation limits will not be violated before applying any updates.
for (ReservationTracker* tracker : other_path_to_common) {
if (tracker->reservation_ + bytes > tracker->reservation_limit_) return false;
}
// Do the updates now that we have checked the limits. We're holding all the locks
// so this is all atomic.
for (ReservationTracker* tracker : other_path_to_common) {
tracker->UpdateReservation(bytes);
// We don't handle MemTrackers with limit in this function - this should always
// succeed.
DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
bool success = tracker->TryConsumeFromMemTracker(bytes, MemLimit::HARD);
DCHECK(success);
if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
}
for (ReservationTracker* tracker : path_to_common) {
if (tracker != path_to_common[0]) tracker->child_reservations_ -= bytes;
tracker->UpdateReservation(-bytes);
tracker->ReleaseToMemTracker(bytes);
}
// Update the 'child_reservations_' on the common ancestor if needed.
// Case 1: reservation was pushed up to 'other'.
if (common_ancestor == other) {
lock_guard<SpinLock> l(other->lock_);
other->child_reservations_ -= bytes;
other->CheckConsistency();
}
// Case 2: reservation was pushed down below 'this'.
if (common_ancestor == this) {
lock_guard<SpinLock> l(lock_);
child_reservations_ += bytes;
CheckConsistency();
}
return true;
}
vector<ReservationTracker*> ReservationTracker::FindPathToRoot() {
vector<ReservationTracker*> path_to_root;
ReservationTracker* curr = this;
do {
path_to_root.push_back(curr);
curr = curr->parent_;
} while (curr != nullptr);
return path_to_root;
}
void ReservationTracker::AllocateFrom(int64_t bytes) {
lock_guard<SpinLock> l(lock_);
AllocateFromLocked(bytes);
}
void ReservationTracker::AllocateFromLocked(int64_t bytes) {
DCHECK(initialized_);
DCHECK_GE(bytes, 0);
DCHECK_LE(bytes, unused_reservation());
UpdateUsedReservation(bytes);
CheckConsistency();
}
void ReservationTracker::ReleaseTo(int64_t bytes) {
lock_guard<SpinLock> l(lock_);
DCHECK(initialized_);
DCHECK_GE(bytes, 0);
DCHECK_LE(bytes, used_reservation_);
UpdateUsedReservation(-bytes);
CheckConsistency();
}
int64_t ReservationTracker::GetReservation() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
return base::subtle::Acquire_Load(&reservation_);
}
int64_t ReservationTracker::GetUsedReservation() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
return base::subtle::Acquire_Load(&used_reservation_);
}
int64_t ReservationTracker::GetUnusedReservation() {
lock_guard<SpinLock> l(lock_);
DCHECK(initialized_);
return unused_reservation();
}
int64_t ReservationTracker::GetChildReservations() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
return base::subtle::Acquire_Load(&child_reservations_);
}
void ReservationTracker::CheckConsistency() const {
// Check internal invariants.
DCHECK_GE(reservation_, 0);
DCHECK_LE(reservation_, reservation_limit_);
DCHECK_GE(child_reservations_, 0);
DCHECK_GE(used_reservation_, 0);
DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
DCHECK_LE(reservation_, counters_.peak_reservation->value());
DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value());
DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
if (counters_.reservation_limit != nullptr) {
DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
}
}
void ReservationTracker::UpdateUsedReservation(int64_t delta) {
used_reservation_ += delta;
COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
CheckConsistency();
}
void ReservationTracker::UpdateReservation(int64_t delta) {
reservation_ += delta;
COUNTER_SET(counters_.peak_reservation, reservation_);
CheckConsistency();
}
string ReservationTracker::DebugString() {
lock_guard<SpinLock> l(lock_);
if (!initialized_) return "<ReservationTracker>: uninitialized";
string parent_debug_string = parent_ == nullptr ? "NULL" : parent_->DebugString();
return Substitute(
"<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 "
"child_reservations $3 parent:\n$4",
reservation_limit_, reservation_, used_reservation_, child_reservations_,
parent_debug_string);
}
}