blob: 06e13db773820c2bd262fa6328dd359f26fa1db5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <memory>
#include <string>
#include <vector>
#include "velox/common/time/CpuWallTimer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorStream.h"
#include <arrow/array/util.h>
#include <arrow/ipc/writer.h>
#include <arrow/memory_pool.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include "VeloxShuffleWriter.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/PartitionWriter.h"
#include "shuffle/Partitioner.h"
#include "shuffle/Utils.h"
#include "utils/Print.h"
namespace gluten {
// set 1 to open print
#define VELOX_SHUFFLE_WRITER_PRINT 0
#if VELOX_SHUFFLE_WRITER_PRINT
#define VsPrint Print
#define VsPrintLF PrintLF
#define VsPrintSplit PrintSplit
#define VsPrintSplitLF PrintSplitLF
#define VsPrintVectorRange PrintVectorRange
#define VS_PRINT PRINT
#define VS_PRINTLF PRINTLF
#define VS_PRINT_FUNCTION_NAME PRINT_FUNCTION_NAME
#define VS_PRINT_FUNCTION_SPLIT_LINE PRINT_FUNCTION_SPLIT_LINE
#define VS_PRINT_CONTAINER PRINT_CONTAINER
#define VS_PRINT_CONTAINER_TO_STRING PRINT_CONTAINER_TO_STRING
#define VS_PRINT_CONTAINER_2_STRING PRINT_CONTAINER_2_STRING
#define VS_PRINT_VECTOR_TO_STRING PRINT_VECTOR_TO_STRING
#define VS_PRINT_VECTOR_2_STRING PRINT_VECTOR_2_STRING
#define VS_PRINT_VECTOR_MAPPING PRINT_VECTOR_MAPPING
#else // VELOX_SHUFFLE_WRITER_PRINT
#define VsPrint(...) // NOLINT
#define VsPrintLF(...) // NOLINT
#define VsPrintSplit(...) // NOLINT
#define VsPrintSplitLF(...) // NOLINT
#define VsPrintVectorRange(...) // NOLINT
#define VS_PRINT(a)
#define VS_PRINTLF(a)
#define VS_PRINT_FUNCTION_NAME()
#define VS_PRINT_FUNCTION_SPLIT_LINE()
#define VS_PRINT_CONTAINER(c)
#define VS_PRINT_CONTAINER_TO_STRING(c)
#define VS_PRINT_CONTAINER_2_STRING(c)
#define VS_PRINT_VECTOR_TO_STRING(v)
#define VS_PRINT_VECTOR_2_STRING(v)
#define VS_PRINT_VECTOR_MAPPING(v)
#endif // end of VELOX_SHUFFLE_WRITER_PRINT
enum SplitState { kInit, kPreAlloc, kSplit, kStopEvict, kStop };
struct BinaryArrayResizeState {
bool inResize;
uint32_t partitionId;
uint32_t binaryIdx;
BinaryArrayResizeState() : inResize(false) {}
BinaryArrayResizeState(uint32_t partitionId, uint32_t binaryIdx)
: inResize(false), partitionId(partitionId), binaryIdx(binaryIdx) {}
};
class VeloxHashShuffleWriter : public VeloxShuffleWriter {
enum {
kValidityBufferIndex = 0,
kFixedWidthValueBufferIndex = 1,
kBinaryValueBufferIndex = 2,
kBinaryLengthBufferIndex = kFixedWidthValueBufferIndex
};
public:
struct BinaryBuf {
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacityIn, uint64_t valueOffsetIn)
: valuePtr(value), lengthPtr(length), valueCapacity(valueCapacityIn), valueOffset(valueOffsetIn) {}
BinaryBuf(uint8_t* value, uint8_t* length, uint64_t valueCapacity) : BinaryBuf(value, length, valueCapacity, 0) {}
BinaryBuf() : BinaryBuf(nullptr, nullptr, 0) {}
uint8_t* valuePtr;
uint8_t* lengthPtr;
uint64_t valueCapacity;
uint64_t valueOffset;
};
static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
uint32_t numPartitions,
const std::shared_ptr<PartitionWriter>& partitionWriter,
const std::shared_ptr<ShuffleWriterOptions>& options,
MemoryManager* memoryManager);
arrow::Status write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) override;
arrow::Status stop() override;
arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
const uint64_t cachedPayloadSize() const override;
arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) override;
// For test only.
void setPartitionBufferSize(uint32_t newSize) override;
// for debugging
void printColumnsInfo() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINTLF(fixed_width_column_count_);
VS_PRINT_CONTAINER(simple_column_indices_);
VS_PRINT_CONTAINER(binary_column_indices_);
VS_PRINT_CONTAINER(complex_column_indices_);
VS_PRINT_VECTOR_2_STRING(velox_column_types_);
VS_PRINT_VECTOR_TO_STRING(arrow_column_types_);
}
void printPartition() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
// row ID -> partition ID
VS_PRINT_VECTOR_MAPPING(row_2_partition_);
// partition -> row count
VS_PRINT_VECTOR_MAPPING(partition_2_row_count_);
}
void printPartitionBuffer() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINT_VECTOR_MAPPING(partition_2_buffer_size_);
VS_PRINT_VECTOR_MAPPING(partitionBufferBase_);
}
void printPartition2Row() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINT_VECTOR_MAPPING(partition2RowOffsetBase_);
#if VELOX_SHUFFLE_WRITER_PRINT
for (auto pid = 0; pid < numPartitions_; ++pid) {
auto begin = partition2RowOffsetBase_[pid];
auto end = partition2RowOffsetBase_[pid + 1];
VsPrint("partition", pid);
VsPrintVectorRange(rowOffset2RowId_, begin, end);
}
#endif
}
void printInputHasNull() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
VS_PRINT_CONTAINER(input_has_null_);
}
private:
VeloxHashShuffleWriter(
uint32_t numPartitions,
const std::shared_ptr<PartitionWriter>& partitionWriter,
const std::shared_ptr<HashShuffleWriterOptions>& options,
MemoryManager* memoryManager)
: VeloxShuffleWriter(numPartitions, partitionWriter, options, memoryManager),
splitBufferSize_(options->splitBufferSize),
splitBufferReallocThreshold_(options->splitBufferReallocThreshold) {}
arrow::Status init();
arrow::Status initPartitions();
arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);
arrow::Status splitRowVector(const facebook::velox::RowVector& rv);
arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
arrow::Status buildPartition2Row(uint32_t rowNum);
arrow::Status updateInputHasNull(const facebook::velox::RowVector& rv);
void setSplitState(SplitState state);
arrow::Status doSplit(const facebook::velox::RowVector& rv, int64_t memLimit);
bool beyondThreshold(uint32_t partitionId, uint32_t newSize);
uint32_t calculatePartitionBufferSize(const facebook::velox::RowVector& rv, int64_t memLimit);
arrow::Status preAllocPartitionBuffers(uint32_t preAllocBufferSize);
arrow::Status updateValidityBuffers(uint32_t partitionId, uint32_t newSize);
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>
allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize);
arrow::Status allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize);
arrow::Status splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv);
arrow::Status splitBoolType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs);
arrow::Status splitValidityBuffer(const facebook::velox::RowVector& rv);
arrow::Status splitBinaryArray(const facebook::velox::RowVector& rv);
arrow::Status splitComplexType(const facebook::velox::RowVector& rv);
arrow::Status evictBuffers(
uint32_t partitionId,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
bool reuseBuffers);
arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> assembleBuffers(uint32_t partitionId, bool reuseBuffers);
template <typename T>
arrow::Status splitFixedType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs) {
for (auto& pid : partitionUsed_) {
auto dstPidBase = (T*)(dstAddrs[pid] + partitionBufferBase_[pid] * sizeof(T));
auto pos = partition2RowOffsetBase_[pid];
auto end = partition2RowOffsetBase_[pid + 1];
for (; pos < end; ++pos) {
auto rowId = rowOffset2RowId_[pos];
*dstPidBase++ = reinterpret_cast<const T*>(srcAddr)[rowId]; // copy
}
}
return arrow::Status::OK();
}
arrow::Status splitBinaryType(
uint32_t binaryIdx,
const facebook::velox::FlatVector<facebook::velox::StringView>& src,
std::vector<BinaryBuf>& dst);
arrow::Result<int64_t> evictCachedPayload(int64_t size);
arrow::Result<std::shared_ptr<arrow::Buffer>> generateComplexTypeBuffers(facebook::velox::RowVectorPtr vector);
arrow::Status resetValidityBuffer(uint32_t partitionId);
arrow::Result<int64_t> shrinkPartitionBuffersMinSize(int64_t size);
arrow::Result<int64_t> evictPartitionBuffersMinSize(int64_t size);
arrow::Status shrinkPartitionBuffer(uint32_t partitionId);
arrow::Status resetPartitionBuffer(uint32_t partitionId);
// Resize the partition buffer to newSize. If preserveData is true, it will keep the data in buffer.
// Note when preserveData is false, and newSize is larger, this function can introduce unnecessary memory copy.
// In this case, use allocatePartitionBuffer to free current buffers and allocate new buffers instead.
arrow::Status resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool preserveData);
uint64_t valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize);
uint64_t valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, uint32_t newSize);
void calculateSimpleColumnBytes();
void stat() const;
bool shrinkPartitionBuffersAfterSpill() const;
bool evictPartitionBuffersAfterSpill() const;
arrow::Result<uint32_t> partitionBufferSizeAfterShrink(uint32_t partitionId) const;
bool isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const;
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit);
int32_t splitBufferSize_;
double splitBufferReallocThreshold_;
std::shared_ptr<arrow::Schema> schema_;
// Column index, partition id, buffers.
std::vector<std::vector<std::vector<std::shared_ptr<arrow::ResizableBuffer>>>> partitionBuffers_;
BinaryArrayResizeState binaryArrayResizeState_{};
bool hasComplexType_ = false;
std::vector<bool> isValidityBuffer_;
// Store arrow column types. Calculated once.
std::vector<std::shared_ptr<arrow::DataType>> arrowColumnTypes_;
// Store velox column types. Calculated once.
std::vector<std::shared_ptr<const facebook::velox::Type>> veloxColumnTypes_;
// How many fixed-width columns in the schema. Calculated once.
uint32_t fixedWidthColumnCount_ = 0;
// The column indices of all binary types in the schema.
std::vector<uint32_t> binaryColumnIndices_;
// The column indices of all fixed-width and binary columns in the schema.
std::vector<uint32_t> simpleColumnIndices_;
// The column indices of all complex types in the schema, including Struct, Map, List columns.
std::vector<uint32_t> complexColumnIndices_;
// Total bytes of fixed-width buffers of all simple columns. Including validity buffers, value buffers of
// fixed-width types and length buffers of binary types.
// Used for estimating pre-allocated partition buffer size. Calculated once.
uint32_t fixedWidthBufferBytes_ = 0;
// Used for calculating the average binary length.
// Updated for each input RowVector.
uint64_t totalInputNumRows_ = 0;
std::vector<uint64_t> binaryArrayTotalSizeBytes_;
size_t complexTotalSizeBytes_ = 0;
// True if input column has null in any processed input RowVector.
// In the order of fixed-width columns + binary columns.
std::vector<bool> inputHasNull_;
// Records which partitions are actually occurred in the current input RowVector.
// Most of the loops can loop on this array to avoid visiting unused partition id.
std::vector<uint32_t> partitionUsed_;
// Row ID -> Partition ID
// subscript: The index of row in the current input RowVector
// value: Partition ID
// Updated for each input RowVector.
std::vector<uint32_t> row2Partition_;
// Partition ID -> Row Count
// subscript: Partition ID
// value: How many rows does this partition have in the current input RowVector
// Updated for each input RowVector.
std::vector<uint32_t> partition2RowCount_;
// Note: partition2RowOffsetBase_ and rowOffset2RowId_ are the optimization of flattening the 2-dimensional vector
// into single dimension.
// The first dimension is the partition id. The second dimension is the ith occurrence of this partition in the
// input RowVector. The value is the index of the row in the input RowVector.
// partition2RowOffsetBase_ records the offset of the first dimension.
//
// The index of the ith occurrence of a give partition `pid` in the input RowVector can be calculated via
// rowOffset2RowId_[partition2RowOffsetBase_[pid] + i]
// i is in the range of [0, partition2RowCount_[pid])
// Partition ID -> Row offset, elements num: Partition num + 1
// subscript: Partition ID
// value: The base row offset of this Partition
// Updated for each input RowVector.
std::vector<uint32_t> partition2RowOffsetBase_;
// Row offset -> Source row ID, elements num: input RowVector row num
// subscript: Row offset
// value: The index of row in the current input RowVector
// Updated for each input RowVector.
std::vector<uint32_t> rowOffset2RowId_;
// Partition buffers are used for holding the intermediate data during split.
// Partition ID -> Partition buffer size(unit is row)
std::vector<uint32_t> partitionBufferSize_;
// The write position of partition buffer. Updated after split. Reset when partition buffers are reallocated.
std::vector<uint32_t> partitionBufferBase_;
// Used by all simple types. Stores raw pointers of partition buffers.
std::vector<std::vector<uint8_t*>> partitionValidityAddrs_;
// Used by fixed-width types. Stores raw pointers of partition buffers.
std::vector<std::vector<uint8_t*>> partitionFixedWidthValueAddrs_;
// Used by binary types. Stores raw pointers and metadata of partition buffers.
std::vector<std::vector<BinaryBuf>> partitionBinaryAddrs_;
// Used by complex types.
// Partition id -> Serialized complex data.
std::vector<std::unique_ptr<facebook::velox::IterativeVectorSerializer>> complexTypeData_;
std::vector<std::shared_ptr<arrow::ResizableBuffer>> complexTypeFlushBuffer_;
std::shared_ptr<const facebook::velox::RowType> complexWriteType_;
facebook::velox::serializer::presto::PrestoVectorSerde serde_;
SplitState splitState_{kInit};
std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
}; // class VeloxHashBasedShuffleWriter
} // namespace gluten