blob: 1367e2e0483f152a4a83900d73b7a1384820c537 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "compute/VeloxBackend.h"
#include "config/VeloxConfig.h"
#include "memory/VeloxMemoryManager.h"
#include "velox/common/base/tests/GTestUtils.h"
namespace gluten {
using namespace facebook::velox;
class MockAllocationListener : public gluten::AllocationListener {
public:
void allocationChanged(int64_t diff) override {
currentBytes_ += diff;
peakBytes_ = std::max(peakBytes_, currentBytes_);
}
int64_t currentBytes() override {
return currentBytes_;
}
int64_t peakBytes() override {
return peakBytes_;
}
uint64_t currentBytes_{0L};
uint64_t peakBytes_{0L};
};
namespace {
static const uint64_t kMB = 1 << 20;
} // namespace
class MemoryManagerTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
std::unordered_map<std::string, std::string> conf = {
{kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)},
{kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
gluten::VeloxBackend::create(AllocationListener::noop(), conf);
}
void SetUp() override {
vmm_ = std::make_unique<VeloxMemoryManager>(
gluten::kVeloxBackendKind, std::make_unique<MockAllocationListener>(), *VeloxBackend::get()->getBackendConf());
listener_ = vmm_->getListener();
allocator_ = vmm_->allocator();
}
std::unique_ptr<VeloxMemoryManager> vmm_;
AllocationListener* listener_;
MemoryAllocator* allocator_;
std::shared_ptr<MemoryAllocator> stdAllocator_ = std::make_shared<StdMemoryAllocator>();
struct Allocation {
void* buffer;
size_t size;
memory::MemoryPool* pool;
};
};
TEST_F(MemoryManagerTest, memoryPoolWithBlockReseravtion) {
auto pool = vmm_->getLeafMemoryPool();
std::vector<Allocation> allocations;
std::vector<uint64_t> sizes{
kMemoryReservationBlockSizeDefault - 1 * kMB, kMemoryReservationBlockSizeDefault - 2 * kMB};
allocations.reserve(sizes.size());
for (const auto& size : sizes) {
auto buf = pool->allocate(size);
allocations.push_back({buf, size, pool.get()});
}
EXPECT_EQ(listener_->currentBytes(), 2 * kMemoryReservationBlockSizeDefault);
EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
for (auto& allocation : allocations) {
allocation.pool->free(allocation.buffer, allocation.size);
}
auto currentBytes = listener_->currentBytes();
ASSERT_EQ(vmm_->shrink(0), currentBytes);
ASSERT_EQ(listener_->currentBytes(), 0);
}
TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {
auto initBytes = listener_->currentBytes();
std::vector<Allocation> allocations;
std::vector<uint64_t> sizes{
kMemoryReservationBlockSizeDefault - 1 * kMB, kMemoryReservationBlockSizeDefault - 2 * kMB};
allocations.reserve(sizes.size());
for (auto i = 0; i < sizes.size(); i++) {
auto size = sizes[i];
auto currentBytes = allocator_->getBytes();
Allocation allocation{.size = size};
allocator_->allocate(size, &allocation.buffer);
allocations.push_back(allocation);
EXPECT_EQ(allocator_->getBytes(), currentBytes + size);
EXPECT_EQ(allocator_->peakBytes(), allocator_->getBytes());
EXPECT_EQ(listener_->currentBytes(), (i + 1) * kMemoryReservationBlockSizeDefault + initBytes);
EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
}
auto currentBytes = allocator_->getBytes();
auto allocation = allocations.back();
allocations.pop_back();
allocator_->free(allocation.buffer, allocation.size);
EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault + initBytes);
currentBytes = allocator_->getBytes();
allocation = allocations.back();
allocations.pop_back();
allocator_->free(allocation.buffer, allocation.size);
EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
EXPECT_EQ(listener_->currentBytes(), initBytes);
ASSERT_EQ(allocator_->getBytes(), 0);
}
namespace {
class AllocationListenerWrapper : public AllocationListener {
public:
explicit AllocationListenerWrapper() {}
void set(AllocationListener* const delegate) {
if (delegate_ != nullptr) {
throw std::runtime_error("Invalid state");
}
delegate_ = delegate;
}
void allocationChanged(int64_t diff) override {
delegate_->allocationChanged(diff);
}
int64_t currentBytes() override {
return delegate_->currentBytes();
}
int64_t peakBytes() override {
return delegate_->peakBytes();
}
private:
AllocationListener* delegate_{nullptr};
};
class SpillableAllocationListener : public AllocationListener {
public:
virtual uint64_t shrink(uint64_t bytes) = 0;
virtual uint64_t spill(uint64_t bytes) = 0;
};
class MockSparkTaskMemoryManager {
public:
explicit MockSparkTaskMemoryManager(const uint64_t maxBytes);
AllocationListener* newListener(std::function<uint64_t(uint64_t)> shrink, std::function<uint64_t(uint64_t)> spill);
uint64_t acquire(uint64_t bytes);
void release(uint64_t bytes);
uint64_t currentBytes() {
return currentBytes_;
}
private:
mutable std::recursive_mutex mutex_;
std::vector<std::unique_ptr<SpillableAllocationListener>> listeners_{};
const uint64_t maxBytes_;
uint64_t currentBytes_{0L};
};
class MockSparkAllocationListener : public SpillableAllocationListener {
public:
explicit MockSparkAllocationListener(
MockSparkTaskMemoryManager* const manager,
std::function<uint64_t(uint64_t)> shrink,
std::function<uint64_t(uint64_t)> spill)
: manager_(manager), shrink_(shrink), spill_(spill) {}
void allocationChanged(int64_t diff) override {
if (diff == 0) {
return;
}
if (diff > 0) {
auto granted = manager_->acquire(diff);
if (granted < diff) {
throw std::runtime_error("OOM");
}
currentBytes_ += granted;
return;
}
manager_->release(-diff);
currentBytes_ -= (-diff);
}
uint64_t shrink(uint64_t bytes) override {
return shrink_(bytes);
}
uint64_t spill(uint64_t bytes) override {
return spill_(bytes);
}
int64_t currentBytes() override {
return currentBytes_;
}
private:
MockSparkTaskMemoryManager* const manager_;
std::function<uint64_t(uint64_t)> shrink_;
std::function<uint64_t(uint64_t)> spill_;
std::atomic<uint64_t> currentBytes_{0L};
};
MockSparkTaskMemoryManager::MockSparkTaskMemoryManager(const uint64_t maxBytes) : maxBytes_(maxBytes) {}
AllocationListener* MockSparkTaskMemoryManager::newListener(
std::function<uint64_t(uint64_t)> shrink,
std::function<uint64_t(uint64_t)> spill) {
listeners_.push_back(std::make_unique<MockSparkAllocationListener>(this, shrink, spill));
return listeners_.back().get();
}
uint64_t MockSparkTaskMemoryManager::acquire(uint64_t bytes) {
std::unique_lock l(mutex_);
auto freeBytes = maxBytes_ - currentBytes_;
if (bytes <= freeBytes) {
currentBytes_ += bytes;
return bytes;
}
// Shrink listeners.
int64_t bytesNeeded = bytes - freeBytes;
for (const auto& listener : listeners_) {
bytesNeeded -= listener->shrink(bytesNeeded);
if (bytesNeeded < 0) {
break;
}
}
if (bytesNeeded > 0) {
for (const auto& listener : listeners_) {
bytesNeeded -= listener->spill(bytesNeeded);
if (bytesNeeded < 0) {
break;
}
}
}
if (bytesNeeded > 0) {
uint64_t granted = bytes - bytesNeeded;
currentBytes_ += granted;
return granted;
}
currentBytes_ += bytes;
return bytes;
}
void MockSparkTaskMemoryManager::release(uint64_t bytes) {
std::unique_lock l(mutex_);
currentBytes_ -= bytes;
}
class MockMemoryReclaimer : public facebook::velox::memory::MemoryReclaimer {
public:
explicit MockMemoryReclaimer(std::vector<void*>& buffs, int32_t size)
: facebook::velox::memory::MemoryReclaimer(0), buffs_(buffs), size_(size) {}
bool reclaimableBytes(const memory::MemoryPool& pool, uint64_t& reclaimableBytes) const override {
uint64_t total = 0;
for (const auto& buf : buffs_) {
if (buf == nullptr) {
continue;
}
total += size_;
}
if (total == 0) {
return false;
}
reclaimableBytes = total;
return true;
}
uint64_t reclaim(memory::MemoryPool* pool, uint64_t targetBytes, uint64_t maxWaitMs, Stats& stats) override {
uint64_t total = 0;
for (auto& buf : buffs_) {
if (buf == nullptr) {
// When:
// 1. Called by allocation from the same pool so buff is not allocated yet.
// 2. Already called once.
continue;
}
pool->free(buf, size_);
buf = nullptr;
total += size_;
}
return total;
}
private:
std::vector<void*>& buffs_;
int32_t size_;
};
void assertCapacitiesMatch(MockSparkTaskMemoryManager& tmm, std::vector<std::unique_ptr<VeloxMemoryManager>>& vmms) {
uint64_t sum = 0;
for (const auto& vmm : vmms) {
if (vmm == nullptr) {
continue;
}
sum += vmm->getAggregateMemoryPool()->capacity();
}
if (tmm.currentBytes() != sum) {
ASSERT_EQ(tmm.currentBytes(), sum);
}
}
} // namespace
class MultiMemoryManagerTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
std::unordered_map<std::string, std::string> conf = {
{kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)},
{kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
gluten::VeloxBackend::create(AllocationListener::noop(), conf);
}
std::unique_ptr<VeloxMemoryManager> newVeloxMemoryManager(std::unique_ptr<AllocationListener> listener) {
return std::make_unique<VeloxMemoryManager>(
gluten::kVeloxBackendKind, std::move(listener), *VeloxBackend::get()->getBackendConf());
}
};
TEST_F(MultiMemoryManagerTest, spill) {
const uint64_t maxBytes = 200 << 20;
const uint32_t numThreads = 100;
const uint32_t numAllocations = 200;
const int32_t allocateSize = 10 << 20;
MockSparkTaskMemoryManager tmm{maxBytes};
std::vector<std::unique_ptr<VeloxMemoryManager>> vmms{};
std::vector<std::thread> threads{};
std::vector<std::vector<void*>> buffs{};
buffs.reserve(numThreads);
for (size_t i = 0; i < numThreads; ++i) {
buffs.push_back({});
vmms.emplace_back(nullptr);
}
// Emulate a shared lock to avoid ABBA deadlock.
std::recursive_mutex mutex;
for (size_t i = 0; i < numThreads; ++i) {
threads.emplace_back([this, i, allocateSize, &tmm, &vmms, &mutex, &buffs]() -> void {
auto wrapper = std::make_unique<AllocationListenerWrapper>(); // Set later.
auto* listener = wrapper.get();
facebook::velox::memory::MemoryPool* pool; // Set later.
{
std::unique_lock<std::recursive_mutex> l(mutex);
vmms[i] = newVeloxMemoryManager(std::move(wrapper));
pool = vmms[i]->getLeafMemoryPool().get();
pool->setReclaimer(std::make_unique<MockMemoryReclaimer>(buffs[i], allocateSize));
listener->set(tmm.newListener(
[](uint64_t bytes) -> uint64_t { return 0; },
[i, &vmms, &mutex](uint64_t bytes) -> uint64_t {
std::unique_lock<std::recursive_mutex> l(mutex);
return vmms[i]->getMemoryManager()->arbitrator()->shrinkCapacity(bytes);
}));
}
{
std::unique_lock<std::recursive_mutex> l(mutex);
for (size_t j = 0; j < numAllocations; ++j) {
assertCapacitiesMatch(tmm, vmms);
buffs[i].push_back(pool->allocate(allocateSize));
assertCapacitiesMatch(tmm, vmms);
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
for (auto& vmm : vmms) {
assertCapacitiesMatch(tmm, vmms);
vmm->getMemoryManager()->arbitrator()->shrinkCapacity(allocateSize * numAllocations);
assertCapacitiesMatch(tmm, vmms);
}
ASSERT_EQ(tmm.currentBytes(), 0);
}
} // namespace gluten