// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/scanner-mem-limiter.h"
#include <boost/thread/locks.hpp>
#include "exec/scan-node.h"
#include "runtime/mem-tracker.h"
#include "common/names.h"
namespace impala {
struct ScannerMemLimiter::RegisteredScan {
RegisteredScan(int64_t estimated_initial_thread_mem)
: estimated_mem(estimated_initial_thread_mem), num_threads(1) {}
/// The estimated amount of memory to run all threads that have already been started.
/// Updated by ClaimMemoryForScannerThread().
AtomicInt64 estimated_mem;
/// The number of threads active in the scan node. Updated by
/// ClaimMemoryForScannerThread().
AtomicInt64 num_threads;
ScannerMemLimiter::ScannerMemLimiter() {}
ScannerMemLimiter::~ScannerMemLimiter() {
for (const auto& element : registered_scans_) {
const unique_ptr<RegisteredScan>& scan = element.second;
DCHECK_EQ(0, scan->estimated_mem.Load());
DCHECK_EQ(0, scan->num_threads.Load());
void ScannerMemLimiter::RegisterScan(
ScanNode* node, int64_t estimated_initial_thread_mem) {
unique_ptr<RegisteredScan> scan(new RegisteredScan(estimated_initial_thread_mem));
lock_guard<shared_mutex> write_lock(registered_scans_lock_);
bool added = registered_scans_.emplace(node, move(scan)).second;
DCHECK(added) << node->DebugString();
bool ScannerMemLimiter::ClaimMemoryForScannerThread(
ScanNode* node, int64_t estimated_thread_mem) {
shared_lock<shared_mutex> read_lock(registered_scans_lock_);
RegisteredScan* found_scan = nullptr;
// Calculate the memory consumption in excess of the current consumption that we expect
// from already-started threads plus the new thread. We need to compute the global
// total across all scans because multiple scans can compete for the same memory.
int64_t addtl_consumption = 0;
for (const auto& element : registered_scans_) {
const unique_ptr<RegisteredScan>& scan = element.second;
int64_t consumption = element.first->mem_tracker()->consumption();
int64_t num_threads = scan->num_threads.Load();
int64_t estimated_mem = scan->estimated_mem.Load();
if (consumption > estimated_mem) {
// Memory exceeded our estimate. Use a crude heuristic of guessing that the scan
// will use up to 50% more memory. This is carried over from old versions of the
// code pre-IMPALA-4835, which were initially added in the commit titled
// "Dynamically scale down mem usage in scanners and io mgr."
if (node == element.first) {
// Add consumption for the new thread.
addtl_consumption += static_cast<int64_t>((consumption * 1.5) / num_threads);
// We guess that consumption of existing threads will grow up to 50% above the
// current consumption.
addtl_consumption += static_cast<int64_t>(consumption * 0.5);
} else {
// The scan hasn't used all the estimated memory yet - make sure that that is
// accounted for.
addtl_consumption += estimated_mem - consumption;
if (node == element.first) addtl_consumption += estimated_thread_mem;
if (node == element.first) found_scan = scan.get();
DCHECK(found_scan != nullptr) << "Increase mem on unregistered scan";
// Check if we have capacity for the expected increase in consumption.
if (addtl_consumption >= node->mem_tracker()->SpareCapacity(MemLimit::SOFT)) {
return false;
// There is enough memory - update the estimated memory with the estimate.
return true;
void ScannerMemLimiter::ReleaseMemoryForScannerThread(
ScanNode* node, int64_t estimated_thread_mem) {
shared_lock<shared_mutex> read_lock(registered_scans_lock_);
auto it = registered_scans_.find(node);
DCHECK(it != registered_scans_.end()) << node->id() << " not found.";
RegisteredScan* scan = it->second.get();
int64_t mem = scan->estimated_mem.Add(-estimated_thread_mem);
DCHECK_GE(mem, 0);
int64_t num_threads = scan->num_threads.Add(-1);
DCHECK_GE(num_threads, 0);