blob: 2139c6e9d724573a0f37b62cd8f7da4275088cad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "shuffle/Partitioning.h"
#include "utils/Compression.h"
#include "utils/Macros.h"
#include <arrow/ipc/options.h>
#include <arrow/util/compression.h>
namespace gluten {
static constexpr int16_t kDefaultBatchSize = 4096;
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
static constexpr int32_t kDefaultDiskWriteBufferSize = 1024 * 1024;
static constexpr double kDefaultSplitBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kDefaultUseRadixSort = true;
static constexpr int32_t kDefaultSortBufferSize = 4096;
static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20;
static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
static constexpr bool kDefaultEnableDictionary = false;
enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle, kGpuHashShuffle };
enum class PartitionWriterType { kLocal, kRss };
struct ShuffleReaderOptions {
ShuffleWriterType shuffleWriterType = ShuffleWriterType::kHashShuffle;
// Compression options.
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME;
CodecBackend codecBackend = CodecBackend::NONE;
// Output batch size.
int32_t batchSize = kDefaultBatchSize;
// Buffer size when reading data from the input stream.
int64_t readerBufferSize = kDefaultReadBufferSize;
// Buffer size when deserializing rows into columnar batches. Only used for sort-based shuffle.
int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
};
struct ShuffleWriterOptions {
ShuffleWriterType shuffleWriterType;
Partitioning partitioning = Partitioning::kRoundRobin;
int32_t startPartitionId = 0;
ShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : shuffleWriterType(shuffleWriterType) {}
ShuffleWriterOptions(ShuffleWriterType shuffleWriterType, Partitioning partitioning, int32_t startPartitionId)
: shuffleWriterType(shuffleWriterType), partitioning(partitioning), startPartitionId(startPartitionId) {}
virtual ~ShuffleWriterOptions() = default;
};
struct HashShuffleWriterOptions : ShuffleWriterOptions {
int32_t splitBufferSize = kDefaultShuffleWriterBufferSize;
double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold;
HashShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {}
HashShuffleWriterOptions(
Partitioning partitioning,
int32_t startPartitionId,
int32_t partitionBufferSize,
double partitionBufferReallocThreshold)
: ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning, startPartitionId),
splitBufferSize(partitionBufferSize),
splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
protected:
HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : ShuffleWriterOptions(shuffleWriterType) {}
HashShuffleWriterOptions(
ShuffleWriterType shuffleWriterType,
Partitioning partitioning,
int32_t startPartitionId,
int32_t partitionBufferSize,
double partitionBufferReallocThreshold)
: ShuffleWriterOptions(shuffleWriterType, partitioning, startPartitionId),
splitBufferSize(partitionBufferSize),
splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
};
struct SortShuffleWriterOptions : ShuffleWriterOptions {
int32_t initialSortBufferSize = kDefaultSortBufferSize; // spark.shuffle.sort.initialBufferSize
int32_t diskWriteBufferSize = kDefaultDiskWriteBufferSize; // spark.shuffle.spill.diskWriteBufferSize
bool useRadixSort = kDefaultUseRadixSort; // spark.shuffle.sort.useRadixSort
SortShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kSortShuffle) {}
SortShuffleWriterOptions(
Partitioning partitioning,
int32_t startPartitionId,
int32_t initialSortBufferSize,
int32_t diskWriteBufferSize,
bool useRadixSort)
: ShuffleWriterOptions(ShuffleWriterType::kSortShuffle, partitioning, startPartitionId),
initialSortBufferSize(initialSortBufferSize),
diskWriteBufferSize(diskWriteBufferSize),
useRadixSort(useRadixSort) {}
};
struct RssSortShuffleWriterOptions : ShuffleWriterOptions {
int32_t splitBufferSize = kDefaultShuffleWriterBufferSize;
int64_t sortBufferMaxSize = kDefaultSortBufferThreshold;
arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME;
RssSortShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kRssSortShuffle) {}
RssSortShuffleWriterOptions(
Partitioning partitioning,
int32_t startPartitionId,
int32_t splitBufferSize,
int64_t sortBufferMaxSize,
arrow::Compression::type compressionType)
: ShuffleWriterOptions(ShuffleWriterType::kRssSortShuffle, partitioning, startPartitionId),
splitBufferSize(splitBufferSize),
sortBufferMaxSize(sortBufferMaxSize),
compressionType(compressionType) {}
};
struct GpuHashShuffleWriterOptions : HashShuffleWriterOptions {
int32_t splitBufferSize = kDefaultShuffleWriterBufferSize;
double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold;
GpuHashShuffleWriterOptions() : HashShuffleWriterOptions(ShuffleWriterType::kGpuHashShuffle) {}
GpuHashShuffleWriterOptions(
Partitioning partitioning,
int32_t startPartitionId,
int32_t partitionBufferSize,
double partitionBufferReallocThreshold)
: HashShuffleWriterOptions(
ShuffleWriterType::kGpuHashShuffle,
partitioning,
startPartitionId,
partitionBufferSize,
partitionBufferReallocThreshold) {}
};
struct LocalPartitionWriterOptions {
int64_t shuffleFileBufferSize = kDefaultShuffleFileBufferSize; // spark.shuffle.file.buffer
int32_t compressionBufferSize =
kDefaultCompressionBufferSize; // spark.io.compression.lz4.blockSize,spark.io.compression.zstd.bufferSize
int32_t compressionThreshold = kDefaultCompressionThreshold;
int32_t mergeBufferSize = kDefaultShuffleWriterBufferSize;
double mergeThreshold = kDefaultMergeBufferThreshold;
int32_t numSubDirs = kDefaultNumSubDirs; // spark.diskStore.subDirectories
bool enableDictionary = kDefaultEnableDictionary;
LocalPartitionWriterOptions() = default;
LocalPartitionWriterOptions(
int64_t shuffleFileBufferSize,
int32_t compressionBufferSize,
int64_t compressionThreshold,
int32_t mergeBufferSize,
double mergeThreshold,
int32_t numSubDirs,
bool enableDictionary)
: shuffleFileBufferSize(shuffleFileBufferSize),
compressionBufferSize(compressionBufferSize),
compressionThreshold(compressionThreshold),
mergeBufferSize(mergeBufferSize),
mergeThreshold(mergeThreshold),
numSubDirs(numSubDirs),
enableDictionary(enableDictionary) {}
};
struct RssPartitionWriterOptions {
int32_t compressionBufferSize =
kDefaultCompressionBufferSize; // spark.io.compression.lz4.blockSize,spark.io.compression.zstd.bufferSize
int64_t pushBufferMaxSize = kDefaultPushMemoryThreshold;
int64_t sortBufferMaxSize = kDefaultSortBufferThreshold;
RssPartitionWriterOptions() = default;
RssPartitionWriterOptions(int32_t compressionBufferSize, int64_t pushBufferMaxSize, int64_t sortBufferMaxSize)
: compressionBufferSize(compressionBufferSize),
pushBufferMaxSize(pushBufferMaxSize),
sortBufferMaxSize(sortBufferMaxSize) {}
};
struct ShuffleWriterMetrics {
int64_t totalBytesWritten{0};
int64_t totalBytesEvicted{0};
int64_t totalBytesToEvict{0};
int64_t totalWriteTime{0};
int64_t totalEvictTime{0};
int64_t totalCompressTime{0};
double avgDictionaryFields{0};
int64_t dictionarySize{0};
std::vector<int64_t> partitionLengths{};
std::vector<int64_t> rawPartitionLengths{}; // Uncompressed size.
};
} // namespace gluten