blob: 0da2a2f187861b5188362f7e2b58449e43d23491 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <duckdb/common/enums/compression_type.hpp>
#include "shuffle/VeloxHashShuffleWriter.h"
#include "tests/VeloxShuffleWriterTestBase.h"
#include "tests/utils/TestUtils.h"
#include "utils/TestAllocationListener.h"
using namespace facebook::velox;
namespace gluten {
namespace {
void assertSpill(TestAllocationListener* listener, std::function<void()> block) {
const auto beforeSpill = listener->reclaimedBytes();
block();
const auto afterSpill = listener->reclaimedBytes();
ASSERT_GT(afterSpill, beforeSpill);
}
} // namespace
class VeloxHashShuffleWriterSpillTest : public VeloxShuffleWriterTestBase, public testing::Test {
protected:
static void SetUpTestSuite() {
setUpVeloxBackend();
}
static void TearDownTestSuite() {
tearDownVeloxBackend();
}
void SetUp() override {
setUpTestData();
}
std::shared_ptr<VeloxShuffleWriter> createHashShuffleWriter(
uint32_t numPartitions,
const std::shared_ptr<HashShuffleWriterOptions>& shuffleWriterOptions,
std::shared_ptr<LocalPartitionWriterOptions> partitionWriterOptions = nullptr,
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME) {
if (partitionWriterOptions == nullptr) {
partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>();
}
std::unique_ptr<arrow::util::Codec> codec;
if (compressionType == arrow::Compression::type::UNCOMPRESSED) {
codec = nullptr;
} else {
GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressionType));
}
auto partitionWriter = std::make_shared<LocalPartitionWriter>(
numPartitions, std::move(codec), getDefaultMemoryManager(), partitionWriterOptions, dataFile_, localDirs_);
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
VeloxHashShuffleWriter::create(
numPartitions, partitionWriter, shuffleWriterOptions, getDefaultMemoryManager()));
return shuffleWriter;
}
int64_t splitRowVectorAndSpill(
VeloxShuffleWriter& shuffleWriter,
std::vector<facebook::velox::RowVectorPtr> vectors,
bool shrink) {
for (auto vector : vectors) {
ASSERT_NOT_OK(splitRowVector(shuffleWriter, vector));
}
auto targetEvicted = shuffleWriter.cachedPayloadSize();
if (shrink) {
targetEvicted += shuffleWriter.partitionBufferSize();
}
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter.reclaimFixedSize(targetEvicted, &evicted));
return evicted;
};
};
TEST_F(VeloxHashShuffleWriterSpillTest, memoryLeak) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter = createHashShuffleWriter(2, shuffleWriterOptions);
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(shuffleWriter->stop());
const auto* arrowPool = getDefaultMemoryManager()->defaultArrowMemoryPool();
ASSERT_EQ(arrowPool->bytes_allocated(), 0);
shuffleWriter.reset();
ASSERT_EQ(arrowPool->bytes_allocated(), 0);
}
TEST_F(VeloxHashShuffleWriterSpillTest, spillFailWithOutOfMemory) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter = createHashShuffleWriter(2, shuffleWriterOptions);
listener_->updateLimit(0L);
listener_->setShuffleWriter(shuffleWriter.get());
listener_->setThrowIfOOM(true);
ASSERT_THROW([&] { auto status = splitRowVector(*shuffleWriter, inputVector1_); }(), GlutenException);
// Should return OOM status because there's no partition buffer to spill.
listener_->reset();
}
TEST_F(VeloxHashShuffleWriterSpillTest, kInit) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter = createHashShuffleWriter(2, shuffleWriterOptions);
// Test spill all partition buffers.
{
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
auto bufferSize = shuffleWriter->partitionBufferSize();
auto payloadSize = shuffleWriter->cachedPayloadSize();
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + bufferSize, &evicted));
ASSERT_EQ(evicted, payloadSize + bufferSize);
// No cached payload after evict.
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
// All partition buffers should be evicted.
ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
}
// Test spill minimum-size partition buffers.
{
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
auto bufferSize = shuffleWriter->partitionBufferSize();
auto payloadSize = shuffleWriter->cachedPayloadSize();
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
ASSERT_GT(evicted, payloadSize);
// No cached payload after evict.
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
ASSERT_LE(shuffleWriter->partitionBufferSize(), bufferSize);
// Not all partition buffers was evicted.
ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
}
// Test spill empty partition buffers.
{
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
// Clear buffers then the size after shrink will be 0.
for (auto pid = 0; pid < shuffleWriter->numPartitions(); ++pid) {
ASSERT_NOT_OK(shuffleWriter->evictPartitionBuffers(pid, true));
}
auto bufferSize = shuffleWriter->partitionBufferSize();
auto payloadSize = shuffleWriter->cachedPayloadSize();
int64_t evicted;
// Evict payload and shrink min-size buffer.
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize + 1, &evicted));
ASSERT_GT(evicted, payloadSize);
ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
// Evict empty partition buffers.
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(bufferSize, &evicted));
ASSERT_GT(evicted, 0);
ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
// Evict again. No reclaimable space.
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
ASSERT_EQ(evicted, 0);
}
ASSERT_NOT_OK(shuffleWriter->stop());
}
TEST_F(VeloxHashShuffleWriterSpillTest, kInitSingle) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->partitioning = Partitioning::kSingle;
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter = createHashShuffleWriter(1, shuffleWriterOptions);
{
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
auto payloadSize = shuffleWriter->partitionBufferSize() + shuffleWriter->cachedPayloadSize();
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
ASSERT_GE(evicted, payloadSize);
// No cached payload after evict.
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
}
ASSERT_NOT_OK(shuffleWriter->stop());
}
TEST_F(VeloxHashShuffleWriterSpillTest, kSplit) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter = createHashShuffleWriter(2, shuffleWriterOptions);
listener_->setShuffleWriter(shuffleWriter.get());
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
listener_->updateLimit(listener_->currentBytes());
assertSpill(listener_, [&]() {
// Trigger spill for the next split.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
});
ASSERT_NOT_OK(shuffleWriter->stop());
listener_->reset();
}
TEST_F(VeloxHashShuffleWriterSpillTest, kSplitSingle) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->partitioning = Partitioning::kSingle;
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter = createHashShuffleWriter(1, shuffleWriterOptions);
listener_->setShuffleWriter(shuffleWriter.get());
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
// Trigger spill for the next split.
listener_->updateLimit(listener_->currentBytes());
assertSpill(listener_, [&]() { ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_)); });
ASSERT_NOT_OK(shuffleWriter->stop());
listener_->reset();
}
TEST_F(VeloxHashShuffleWriterSpillTest, kStop) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
// Force compression.
auto partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>();
partitionWriterOptions->compressionThreshold = 0;
partitionWriterOptions->mergeThreshold = 0;
auto shuffleWriter = createHashShuffleWriter(2, shuffleWriterOptions, partitionWriterOptions);
for (int i = 0; i < 10; ++i) {
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
}
// Reclaim bytes to shrink partition buffer.
int64_t reclaimed = 0;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
ASSERT_GE(reclaimed, 2000);
listener_->setShuffleWriter(shuffleWriter.get());
listener_->updateLimit(listener_->currentBytes());
// Trigger spill during stop.
assertSpill(listener_, [&] { ASSERT_NOT_OK(shuffleWriter->stop()); });
listener_->reset();
}
TEST_F(VeloxHashShuffleWriterSpillTest, kStopComplex) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
// Force compression.
auto partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>();
partitionWriterOptions->compressionThreshold = 0;
partitionWriterOptions->mergeThreshold = 0;
auto shuffleWriter = createHashShuffleWriter(2, shuffleWriterOptions, partitionWriterOptions);
for (int i = 0; i < 3; ++i) {
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
}
// Reclaim bytes to shrink partition buffer.
int64_t reclaimed = 0;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
ASSERT_GE(reclaimed, 2000);
// Reclaim from PartitionWriter to free cached bytes.
auto payloadSize = shuffleWriter->cachedPayloadSize();
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(payloadSize, &evicted));
ASSERT_EQ(evicted, payloadSize);
listener_->setShuffleWriter(shuffleWriter.get());
listener_->updateLimit(listener_->currentBytes());
// When evicting partitioning buffers in stop, spill will be triggered by complex types allocating extra memory.
assertSpill(listener_, [&] { ASSERT_NOT_OK(shuffleWriter->stop()); });
listener_->reset();
}
TEST_F(VeloxHashShuffleWriterSpillTest, evictPartitionBuffers) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->splitBufferSize = 4;
auto shuffleWriter = createHashShuffleWriter(2, shuffleWriterOptions);
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
// First evict cached payloads.
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->cachedPayloadSize(), &evicted));
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
// Evict again. Because no cached payload to evict, it will try to evict all partition buffers.
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(shuffleWriter->partitionBufferSize(), &evicted));
ASSERT_EQ(shuffleWriter->partitionBufferSize(), 0);
}
TEST_F(VeloxHashShuffleWriterSpillTest, kUnevictableSingle) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->partitioning = Partitioning::kSingle;
auto shuffleWriter = createHashShuffleWriter(1, shuffleWriterOptions);
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
// First evict cached payloads.
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(
shuffleWriter->partitionBufferSize() + shuffleWriter->cachedPayloadSize(), &evicted));
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
ASSERT_GE(evicted, 0);
// Evict again. The evicted size should be 0.
ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(1, &evicted));
ASSERT_EQ(evicted, 0);
}
TEST_F(VeloxHashShuffleWriterSpillTest, resizeBinaryBufferTriggerSpill) {
auto shuffleWriterOptions = std::make_shared<HashShuffleWriterOptions>();
shuffleWriterOptions->splitBufferReallocThreshold = 1;
auto partitionWriterOptions = std::make_shared<LocalPartitionWriterOptions>();
auto shuffleWriter =
createHashShuffleWriter(1, shuffleWriterOptions, partitionWriterOptions, arrow::Compression::type::UNCOMPRESSED);
// Split first input vector. Large average string length.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary1_));
listener_->setShuffleWriter(shuffleWriter.get());
listener_->updateLimit(listener_->currentBytes());
assertSpill(listener_, [&] {
// Split second input vector. Large average string length.
ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorLargeBinary2_));
});
ASSERT_NOT_OK(shuffleWriter->stop());
listener_->reset();
}
} // namespace gluten