blob: 9741c2b67299d48a5e7df87271122b0053d24ff1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <arrow/util/compression.h>
#include "config/GlutenConfig.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/VeloxRssSortShuffleWriter.h"
#include "tests/VeloxShuffleWriterTestBase.h"
#include "tests/utils/TestUtils.h"
#include "velox/buffer/Buffer.h"
#include "velox/type/Type.h"
#include "velox/vector/tests/VectorTestUtils.h"
#include "velox/vector/tests/utils/VectorTestBase.h"
using namespace facebook::velox;
using namespace facebook::velox::test;
namespace gluten {
class VeloxRssSortShuffleWriterTest : public VeloxShuffleWriterTestBase, public testing::Test {
protected:
static void SetUpTestSuite() {
setUpVeloxBackend();
}
static void TearDownTestSuite() {
tearDownVeloxBackend();
}
void SetUp() override {
setUpTestData();
}
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t numPartitions) {
auto options = std::make_shared<RssPartitionWriterOptions>();
auto writerOptions = std::make_shared<RssSortShuffleWriterOptions>();
auto rssClient = std::make_unique<LocalRssClient>(dataFile_);
std::unique_ptr<arrow::util::Codec> codec;
if (writerOptions->compressionType == arrow::Compression::type::UNCOMPRESSED) {
codec = nullptr;
} else {
GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(writerOptions->compressionType));
}
auto partitionWriter = std::make_shared<RssPartitionWriter>(
numPartitions, std::move(codec), getDefaultMemoryManager(), options, std::move(rssClient));
GLUTEN_ASSIGN_OR_THROW(
auto shuffleWriter,
VeloxRssSortShuffleWriter::create(
numPartitions, std::move(partitionWriter), std::move(writerOptions), getDefaultMemoryManager()));
return shuffleWriter;
}
};
TEST_F(VeloxRssSortShuffleWriterTest, calculateBatchesSize) {
auto shuffleWriter = std::dynamic_pointer_cast<VeloxRssSortShuffleWriter>(createShuffleWriter(10));
// Do not trigger resetBatches by shuffle writer.
const int64_t memLimit = INT64_MAX;
// Shared string buffer in FlatVector<StringView>.
BufferPtr strBuffer = AlignedBuffer::allocate<char>(200, pool());
auto vector1 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100, pool());
vector1->setStringBuffers({strBuffer});
auto vector2 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100, pool());
vector2->setStringBuffers({strBuffer});
auto vector3 = BaseVector::create<FlatVector<StringView>>(VARCHAR(), 100, pool());
vector3->setStringBuffers({strBuffer});
auto vector4 = BaseVector::create<FlatVector<int64_t>>(INTEGER(), 100, pool());
auto rowVector1 = makeRowVector({vector1, vector2});
auto rowVector2 = makeRowVector({vector3, vector4});
std::shared_ptr<ColumnarBatch> cb1 = std::make_shared<VeloxColumnarBatch>(rowVector1);
std::shared_ptr<ColumnarBatch> cb2 = std::make_shared<VeloxColumnarBatch>(rowVector2);
ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
ASSERT_NOT_OK(shuffleWriter->write(cb2, memLimit));
auto expectedSize = rowVector1->retainedSize() + rowVector2->retainedSize() - strBuffer->capacity() * 2;
EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
shuffleWriter->resetBatches();
// Shared string buffer in ArrayVector.
BufferPtr offsets = allocateOffsets(1, vector1->pool());
BufferPtr sizes = allocateOffsets(1, vector1->pool());
sizes->asMutable<vector_size_t>()[0] = vector1->size();
auto arrayVector =
std::make_shared<facebook::velox::ArrayVector>(pool(), ARRAY(VARCHAR()), nullptr, 1, offsets, sizes, vector1);
auto rowVector3 = makeRowVector({arrayVector, vector1});
cb1 = std::make_shared<VeloxColumnarBatch>(rowVector3);
ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
expectedSize = rowVector3->retainedSize() - strBuffer->capacity();
EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
shuffleWriter->resetBatches();
// Shared string buffer in MapVector.
auto keys = vector1;
auto values = vector2;
auto mapVector = makeMapVector({0, 10, 20, 50}, keys, values);
auto rowVector4 = makeRowVector({mapVector, vector3});
cb1 = std::make_shared<VeloxColumnarBatch>(rowVector4);
ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
expectedSize = rowVector4->retainedSize() - strBuffer->capacity() * 2;
EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
shuffleWriter->resetBatches();
// Shared string buffer in RowVector.
auto rowVector5 = makeRowVector({rowVector1, vector3});
cb1 = std::make_shared<VeloxColumnarBatch>(rowVector5);
ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
expectedSize = rowVector5->retainedSize() - strBuffer->capacity() * 2;
EXPECT_EQ(expectedSize, shuffleWriter->getInputColumnBytes());
shuffleWriter->resetBatches();
// Vector is not flatten.
auto dictionaryVector = BaseVector::wrapInDictionary(
BufferPtr(nullptr),
makeIndices(vector1->size(), [](vector_size_t row) { return row; }),
vector1->size(),
vector1);
auto rowVector6 = makeRowVector({dictionaryVector});
cb1 = std::make_shared<VeloxColumnarBatch>(rowVector6);
ASSERT_NOT_OK(shuffleWriter->write(cb1, memLimit));
EXPECT_EQ(rowVector6->retainedSize(), shuffleWriter->getInputColumnBytes());
}
} // namespace gluten