blob: d5c71d78e6253ededdc7c8598cb62228e494b17e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "VeloxMemoryManager.h"
#ifdef ENABLE_JEMALLOC_STATS
#include <jemalloc/jemalloc.h>
#endif
#include "compute/VeloxBackend.h"
#include "velox/common/memory/MallocAllocator.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/common/process/StackTrace.h"
#include "velox/exec/MemoryReclaimer.h"
#include "config/VeloxConfig.h"
#include "memory/ArrowMemoryPool.h"
#include "utils/Exception.h"
DECLARE_int32(gluten_velox_async_timeout_on_task_stopping);
namespace gluten {
using namespace facebook;
std::unordered_map<std::string, std::string> getExtraArbitratorConfigs(
const facebook::velox::config::ConfigBase& backendConf) {
auto reservationBlockSize =
backendConf.get<uint64_t>(kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
auto memInitCapacity = backendConf.get<uint64_t>(kVeloxMemInitCapacity, kVeloxMemInitCapacityDefault);
auto memReclaimMaxWaitMs = backendConf.get<uint64_t>(kVeloxMemReclaimMaxWaitMs, kVeloxMemReclaimMaxWaitMsDefault);
std::unordered_map<std::string, std::string> extraArbitratorConfigs;
extraArbitratorConfigs[std::string(kMemoryPoolInitialCapacity)] = folly::to<std::string>(memInitCapacity) + "B";
extraArbitratorConfigs[std::string(kMemoryPoolTransferCapacity)] = folly::to<std::string>(reservationBlockSize) + "B";
extraArbitratorConfigs[std::string(kMemoryReclaimMaxWaitMs)] = folly::to<std::string>(memReclaimMaxWaitMs) + "ms";
return extraArbitratorConfigs;
}
namespace {
template <typename T>
T getConfig(
const std::unordered_map<std::string, std::string>& configs,
const std::string_view& key,
const T& defaultValue) {
if (configs.count(std::string(key)) > 0) {
try {
return folly::to<T>(configs.at(std::string(key)));
} catch (const std::exception& e) {
VELOX_USER_FAIL("Failed while parsing SharedArbitrator configs: {}", e.what());
}
}
return defaultValue;
}
/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
ListenableArbitrator(const Config& config, AllocationListener* listener)
: MemoryArbitrator(config),
listener_(listener),
memoryPoolInitialCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
kMemoryPoolInitialCapacity,
std::to_string(kDefaultMemoryPoolInitialCapacity)),
velox::config::CapacityUnit::BYTE)),
memoryPoolTransferCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
kMemoryPoolTransferCapacity,
std::to_string(kDefaultMemoryPoolTransferCapacity)),
velox::config::CapacityUnit::BYTE)),
memoryReclaimMaxWaitMs_(
std::chrono::duration_cast<std::chrono::milliseconds>(velox::config::toDuration(getConfig<std::string>(
config.extraConfigs,
kMemoryReclaimMaxWaitMs,
std::string(kDefaultMemoryReclaimMaxWaitMs))))
.count()) {}
std::string kind() const override {
return kind_;
}
void shutdown() override {}
void addPool(const std::shared_ptr<velox::memory::MemoryPool>& pool) override {
VELOX_CHECK_EQ(pool->capacity(), 0);
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidates_.count(pool.get()), 0);
candidates_.emplace(pool.get(), pool->weak_from_this());
}
void removePool(velox::memory::MemoryPool* pool) override {
VELOX_CHECK_EQ(pool->reservedBytes(), 0);
shrinkCapacity(pool, pool->capacity());
std::unique_lock guard{mutex_};
const auto ret = candidates_.erase(pool);
VELOX_CHECK_EQ(ret, 1);
}
void growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
// Set arbitration context to allow memory over-use during recursive arbitration.
// See MemoryPoolImpl::maybeIncrementReservation.
velox::memory::ScopedMemoryArbitrationContext ctx{};
velox::memory::MemoryPool* candidate;
{
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool");
candidate = candidates_.begin()->first;
}
VELOX_CHECK(pool->root() == candidate, "Illegal state in ListenableArbitrator");
growCapacityInternal(pool->root(), targetBytes);
}
uint64_t shrinkCapacity(uint64_t targetBytes, bool allowSpill, bool allowAbort) override {
velox::memory::ScopedMemoryArbitrationContext ctx{};
facebook::velox::exec::MemoryReclaimer::Stats status;
velox::memory::MemoryPool* pool = nullptr;
{
std::unique_lock guard{mutex_};
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool");
pool = candidates_.begin()->first;
}
pool->reclaim(targetBytes, memoryReclaimMaxWaitMs_, status); // ignore the output
return shrinkCapacityInternal(pool, 0);
}
uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
return shrinkCapacityInternal(pool, targetBytes);
}
Stats stats() const override {
Stats stats; // no-op
return stats;
}
std::string toString() const override {
return fmt::format("ARBITRATOR[{}] CAPACITY {} {}", kind_, velox::succinctBytes(capacity()), stats().toString());
}
private:
void growCapacityInternal(velox::memory::MemoryPool* pool, uint64_t bytes) {
// Since
// https://github.com/facebookincubator/velox/pull/9557/files#diff-436e44b7374032f8f5d7eb45869602add6f955162daa2798d01cc82f8725724dL812-L820,
// We should pass bytes as parameter "reservationBytes" when calling ::grow.
auto freeByes = pool->freeBytes();
if (freeByes > bytes) {
if (growPool(pool, 0, bytes)) {
return;
}
}
auto reclaimedFreeBytes = shrinkPool(pool, 0);
auto neededBytes = velox::bits::roundUp(bytes - reclaimedFreeBytes, memoryPoolTransferCapacity_);
try {
listener_->allocationChanged(neededBytes);
} catch (const std::exception&) {
VLOG(2) << "ListenableArbitrator growCapacityInternal failed, stacktrace: "
<< velox::process::StackTrace().toString();
// if allocationChanged failed, we need to free the reclaimed bytes
listener_->allocationChanged(-reclaimedFreeBytes);
std::rethrow_exception(std::current_exception());
}
auto ret = growPool(pool, reclaimedFreeBytes + neededBytes, bytes);
VELOX_CHECK(
ret,
"{} failed to grow {} bytes, current state {}",
pool->name(),
velox::succinctBytes(bytes),
pool->toString());
}
uint64_t shrinkCapacityInternal(velox::memory::MemoryPool* pool, uint64_t bytes) {
uint64_t freeBytes = shrinkPool(pool, bytes);
listener_->allocationChanged(-freeBytes);
return freeBytes;
}
gluten::AllocationListener* listener_ = nullptr;
const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
const uint64_t memoryPoolTransferCapacity_;
const uint64_t memoryReclaimMaxWaitMs_;
mutable std::mutex mutex_;
inline static std::string kind_ = "GLUTEN";
std::unordered_map<velox::memory::MemoryPool*, std::weak_ptr<velox::memory::MemoryPool>> candidates_;
};
} // namespace
ArbitratorFactoryRegister::ArbitratorFactoryRegister(gluten::AllocationListener* listener) : listener_(listener) {
static std::atomic_uint32_t id{0UL};
kind_ = "GLUTEN_ARBITRATOR_FACTORY_" + std::to_string(id++);
velox::memory::MemoryArbitrator::registerFactory(
kind_,
[this](
const velox::memory::MemoryArbitrator::Config& config) -> std::unique_ptr<velox::memory::MemoryArbitrator> {
return std::make_unique<ListenableArbitrator>(config, listener_);
});
}
ArbitratorFactoryRegister::~ArbitratorFactoryRegister() {
velox::memory::MemoryArbitrator::unregisterFactory(kind_);
}
VeloxMemoryManager::VeloxMemoryManager(
const std::string& kind,
std::unique_ptr<AllocationListener> listener,
const facebook::velox::config::ConfigBase& backendConf)
: MemoryManager(kind), listener_(std::move(listener)) {
auto reservationBlockSize =
backendConf.get<uint64_t>(kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), reservationBlockSize);
defaultArrowPool_ = std::make_shared<ArrowMemoryPool>(blockListener_.get());
arrowPools_.emplace("default", defaultArrowPool_);
auto checkUsageLeak = backendConf.get<bool>(kCheckUsageLeak, kCheckUsageLeakDefault);
ArbitratorFactoryRegister afr(listener_.get());
velox::memory::MemoryManager::Options mmOptions;
mmOptions.alignment = velox::memory::MemoryAllocator::kMaxAlignment;
mmOptions.trackDefaultUsage = true; // memory usage tracking
mmOptions.checkUsageLeak = checkUsageLeak; // leak check
mmOptions.coreOnAllocationFailureEnabled = false;
mmOptions.allocatorCapacity = velox::memory::kMaxMemory;
mmOptions.arbitratorKind = afr.getKind();
mmOptions.extraArbitratorConfigs = getExtraArbitratorConfigs(backendConf);
veloxMemoryManager_ = std::make_unique<velox::memory::MemoryManager>(mmOptions);
veloxAggregatePool_ = veloxMemoryManager_->addRootPool(
"root",
velox::memory::kMaxMemory, // the 3rd capacity
facebook::velox::memory::MemoryReclaimer::create());
veloxLeafPool_ = veloxAggregatePool_->addLeafChild("default_leaf");
}
namespace {
MemoryUsageStats collectVeloxMemoryUsageStats(const velox::memory::MemoryPool* pool) {
MemoryUsageStats stats;
stats.set_current(pool->usedBytes());
stats.set_peak(pool->peakBytes());
// walk down root and all children
pool->visitChildren([&](velox::memory::MemoryPool* pool) -> bool {
stats.mutable_children()->emplace(pool->name(), collectVeloxMemoryUsageStats(pool));
return true;
});
return stats;
}
MemoryUsageStats collectGlutenAllocatorMemoryUsageStats(
const std::unordered_map<std::string, std::weak_ptr<ArrowMemoryPool>>& arrowPools) {
MemoryUsageStats stats;
int64_t totalBytes = 0;
int64_t peakBytes = 0;
for (const auto& [name, ptr] : arrowPools) {
auto pool = ptr.lock();
if (pool == nullptr) {
continue;
}
MemoryUsageStats poolStats;
const auto allocated = pool->bytes_allocated();
const auto peak = pool->max_memory();
poolStats.set_current(allocated);
poolStats.set_peak(peak);
stats.mutable_children()->emplace(name, poolStats);
totalBytes += allocated;
peakBytes = std::max(peakBytes, peak);
}
stats.set_current(totalBytes);
stats.set_peak(peakBytes);
return stats;
}
void logMemoryUsageStats(MemoryUsageStats stats, const std::string& name, const std::string& logPrefix, std::stringstream& ss) {
ss << logPrefix << "+- " << name
<< " (used: " << velox::succinctBytes(stats.current())
<< ", peak: " << velox::succinctBytes(stats.peak()) << ")\n";
if (stats.children_size() > 0) {
for (auto it = stats.children().begin(); it != stats.children().end(); ++it) {
logMemoryUsageStats(it->second, it->first, logPrefix + " ", ss);
}
}
}
int64_t shrinkVeloxMemoryPool(velox::memory::MemoryManager* mm, velox::memory::MemoryPool* pool, int64_t size) {
std::string poolName{pool->root()->name() + "/" + pool->name()};
std::string logPrefix{"Shrink[" + poolName + "]: "};
VLOG(2) << logPrefix << "Trying to shrink " << size << " bytes of data...";
VLOG(2) << logPrefix << "Pool has reserved " << pool->usedBytes() << "/" << pool->root()->reservedBytes() << "/"
<< pool->root()->capacity() << "/" << pool->root()->maxCapacity() << " bytes.";
if (VLOG_IS_ON(2)) {
std::stringstream ss;
ss << logPrefix << "Velox memory usage stats:\n";
logMemoryUsageStats(collectVeloxMemoryUsageStats(pool), poolName, logPrefix, ss);
VLOG(2) << ss.str();
}
VLOG(2) << logPrefix << "Shrinking...";
auto shrunken = mm->arbitrator()->shrinkCapacity(pool, 0);
VLOG(2) << logPrefix << shrunken << " bytes released from shrinking.";
return shrunken;
}
} // namespace
std::shared_ptr<arrow::MemoryPool> VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
std::lock_guard<std::mutex> l(mutex_);
if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
auto pool = it->second.lock();
VELOX_CHECK_NOT_NULL(pool, "Arrow memory pool {} has been destructed", name);
return pool;
}
auto pool = std::make_shared<ArrowMemoryPool>(
blockListener_.get(), [this, name](arrow::MemoryPool* pool) { this->dropMemoryPool(name); });
arrowPools_.emplace(name, pool);
return pool;
}
void VeloxMemoryManager::dropMemoryPool(const std::string& name) {
std::lock_guard<std::mutex> l(mutex_);
const auto ret = arrowPools_.erase(name);
VELOX_CHECK_EQ(ret, 1, "Child memory pool {} doesn't exist", name);
}
const MemoryUsageStats VeloxMemoryManager::collectMemoryUsageStats() const {
MemoryUsageStats stats;
stats.set_current(listener_->currentBytes());
stats.set_peak(listener_->peakBytes());
stats.mutable_children()->emplace("gluten::MemoryAllocator", collectGlutenAllocatorMemoryUsageStats(arrowPools_));
stats.mutable_children()->emplace(
veloxAggregatePool_->name(), collectVeloxMemoryUsageStats(veloxAggregatePool_.get()));
return stats;
}
const int64_t VeloxMemoryManager::shrink(int64_t size) {
return shrinkVeloxMemoryPool(veloxMemoryManager_.get(), veloxAggregatePool_.get(), size);
}
namespace {
void holdInternal(
std::vector<std::shared_ptr<facebook::velox::memory::MemoryPool>>& heldVeloxPools,
const velox::memory::MemoryPool* pool) {
pool->visitChildren([&](velox::memory::MemoryPool* child) -> bool {
auto shared = child->shared_from_this();
heldVeloxPools.push_back(shared);
holdInternal(heldVeloxPools, child);
return true;
});
}
} // namespace
void VeloxMemoryManager::hold() {
holdInternal(heldVeloxPools_, veloxAggregatePool_.get());
}
bool VeloxMemoryManager::tryDestructSafe() {
// Velox memory pools considered safe to destruct when no alive allocations.
for (const auto& pool : heldVeloxPools_) {
if (pool && pool->usedBytes() != 0) {
return false;
}
}
if (veloxLeafPool_ && veloxLeafPool_->usedBytes() != 0) {
return false;
}
if (veloxAggregatePool_ && veloxAggregatePool_->usedBytes() != 0) {
return false;
}
heldVeloxPools_.clear();
veloxLeafPool_.reset();
veloxAggregatePool_.reset();
// Velox memory manager considered safe to destruct when no alive pools.
if (veloxMemoryManager_) {
if (veloxMemoryManager_->numPools() > 3) {
VLOG(2) << "Attempt to destruct VeloxMemoryManager failed because there are " << veloxMemoryManager_->numPools()
<< " outstanding memory pools.";
return false;
}
if (veloxMemoryManager_->numPools() == 3) {
// Assert the pool is spill pool
// See https://github.com/facebookincubator/velox/commit/e6f84e8ac9ef6721f527a2d552a13f7e79bdf72e
// https://github.com/facebookincubator/velox/commit/ac134400b5356c5ba3f19facee37884aa020afdc
int32_t spillPoolCount = 0;
int32_t cachePoolCount = 0;
int32_t tracePoolCount = 0;
veloxMemoryManager_->deprecatedSysRootPool().visitChildren([&](velox::memory::MemoryPool* child) -> bool {
if (child == veloxMemoryManager_->spillPool()) {
spillPoolCount++;
}
if (child == veloxMemoryManager_->cachePool()) {
cachePoolCount++;
}
if (child == veloxMemoryManager_->tracePool()) {
tracePoolCount++;
}
return true;
});
GLUTEN_CHECK(spillPoolCount == 1, "Illegal pool count state: spillPoolCount: " + std::to_string(spillPoolCount));
GLUTEN_CHECK(cachePoolCount == 1, "Illegal pool count state: cachePoolCount: " + std::to_string(cachePoolCount));
GLUTEN_CHECK(tracePoolCount == 1, "Illegal pool count state: tracePoolCount: " + std::to_string(tracePoolCount));
}
if (veloxMemoryManager_->numPools() < 3) {
GLUTEN_CHECK(false, "Unreachable code");
}
}
veloxMemoryManager_.reset();
// Applies similar rule for Arrow memory pool.
if (!arrowPools_.empty() && std::any_of(arrowPools_.begin(), arrowPools_.end(), [&](const auto& entry) {
auto pool = entry.second.lock();
if (pool == nullptr) {
return false;
}
return pool->bytes_allocated() != 0;
})) {
VLOG(2) << "Attempt to destruct VeloxMemoryManager failed because there are still outstanding Arrow memory pools.";
return false;
}
arrowPools_.clear();
// Successfully destructed.
return true;
}
VeloxMemoryManager::~VeloxMemoryManager() {
static const uint32_t kWaitTimeoutMs = FLAGS_gluten_velox_async_timeout_on_task_stopping; // 30s by default
uint32_t accumulatedWaitMs = 0UL;
bool destructed = false;
for (int32_t tryCount = 0; accumulatedWaitMs < kWaitTimeoutMs; tryCount++) {
destructed = tryDestructSafe();
if (destructed) {
if (tryCount > 0) {
LOG(INFO) << "All the outstanding memory resources successfully released. ";
}
break;
}
uint32_t waitMs = 50 * static_cast<uint32_t>(pow(1.5, tryCount)); // 50ms, 75ms, 112.5ms ...
LOG(INFO) << "There are still outstanding Velox memory allocations. Waiting for " << waitMs
<< " ms to let possible async tasks done... ";
usleep(waitMs * 1000);
accumulatedWaitMs += waitMs;
}
if (!destructed) {
LOG(ERROR) << "Failed to release Velox memory manager after " << accumulatedWaitMs
<< "ms as there are still outstanding memory resources. ";
}
#ifdef ENABLE_JEMALLOC_STATS
malloc_stats_print(nullptr, nullptr, nullptr);
#endif
}
VeloxMemoryManager* getDefaultMemoryManager() {
return VeloxBackend::get()->getGlobalMemoryManager();
}
std::shared_ptr<velox::memory::MemoryPool> defaultLeafVeloxMemoryPool() {
return getDefaultMemoryManager()->getLeafMemoryPool();
}
} // namespace gluten