blob: e5435aee52dd898f0638850a056baab71549f085 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "shuffle/VeloxShuffleReader.h"
#include <arrow/array/array_binary.h>
#include <arrow/io/buffered.h>
#include <velox/common/caching/AsyncDataCache.h>
#include "memory/VeloxColumnarBatch.h"
#include "shuffle/GlutenByteStream.h"
#include "shuffle/Payload.h"
#include "shuffle/Utils.h"
#include "utils/Common.h"
#include "utils/Macros.h"
#include "utils/Timer.h"
#include "utils/VeloxArrowUtils.h"
#include "velox/row/CompactRow.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/arrow/Bridge.h"
#include <algorithm>
#ifdef GLUTEN_ENABLE_GPU
#include "VeloxGpuShuffleReader.h"
#endif
using namespace facebook::velox;
namespace gluten {
namespace {
arrow::Result<BlockType> readBlockType(arrow::io::InputStream* inputStream) {
BlockType type;
ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream->Read(sizeof(BlockType), &type));
if (bytes == 0) {
// Reach EOS.
return BlockType::kEndOfStream;
}
return type;
}
struct BufferViewReleaser {
BufferViewReleaser() : BufferViewReleaser(nullptr) {}
BufferViewReleaser(std::shared_ptr<arrow::Buffer> arrowBuffer) : bufferReleaser_(std::move(arrowBuffer)) {}
void addRef() const {}
void release() const {}
private:
const std::shared_ptr<arrow::Buffer> bufferReleaser_;
};
BufferPtr wrapInBufferViewAsOwner(const void* buffer, size_t length, std::shared_ptr<arrow::Buffer> bufferReleaser) {
return BufferView<BufferViewReleaser>::create(
static_cast<const uint8_t*>(buffer), length, {std::move(bufferReleaser)});
}
BufferPtr convertToVeloxBuffer(std::shared_ptr<arrow::Buffer> buffer) {
if (buffer == nullptr) {
return nullptr;
}
return wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer);
}
template <TypeKind Kind, typename T = typename TypeTraits<Kind>::NativeType>
VectorPtr readFlatVector(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
const VectorPtr& dictionary,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
auto valuesOrIndices = buffers[bufferIdx++];
nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
if (dictionary != nullptr) {
return BaseVector::wrapInDictionary(nulls, valuesOrIndices, length, dictionary);
}
return std::make_shared<FlatVector<T>>(
pool, type, nulls, length, std::move(valuesOrIndices), std::vector<BufferPtr>{});
}
template <>
VectorPtr readFlatVector<TypeKind::UNKNOWN>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
const VectorPtr& dictionary,
memory::MemoryPool* pool) {
return BaseVector::createNullConstant(type, length, pool);
}
template <>
VectorPtr readFlatVector<TypeKind::HUGEINT>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
const VectorPtr& dictionary,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
auto valuesOrIndices = buffers[bufferIdx++];
// Because if buffer does not compress, it will get from netty, the address maynot aligned 16B, which will cause
// int128_t = xxx coredump by instruction movdqa
const auto* addr = valuesOrIndices->as<facebook::velox::int128_t>();
if ((reinterpret_cast<uintptr_t>(addr) & 0xf) != 0) {
auto alignedBuffer = AlignedBuffer::allocate<char>(valuesOrIndices->size(), pool);
fastCopy(alignedBuffer->asMutable<char>(), valuesOrIndices->as<char>(), valuesOrIndices->size());
valuesOrIndices = alignedBuffer;
}
nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
if (dictionary != nullptr) {
return BaseVector::wrapInDictionary(nulls, valuesOrIndices, length, dictionary);
}
return std::make_shared<FlatVector<int128_t>>(
pool, type, nulls, length, std::move(valuesOrIndices), std::vector<BufferPtr>{});
}
VectorPtr readFlatVectorStringView(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
const VectorPtr& dictionary,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
auto lengthOrIndices = buffers[bufferIdx++];
nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
if (dictionary != nullptr) {
return BaseVector::wrapInDictionary(nulls, lengthOrIndices, length, dictionary);
}
auto valueBuffer = buffers[bufferIdx++];
const auto* rawLength = lengthOrIndices->as<StringLengthType>();
const auto* valueBufferPtr = valueBuffer->as<char>();
auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length, pool);
auto* rawValues = values->asMutable<StringView>();
uint64_t offset = 0;
for (int32_t i = 0; i < length; ++i) {
rawValues[i] = StringView(valueBufferPtr + offset, rawLength[i]);
offset += rawLength[i];
}
std::vector<BufferPtr> stringBuffers;
stringBuffers.emplace_back(valueBuffer);
return std::make_shared<FlatVector<StringView>>(
pool, type, nulls, length, std::move(values), std::move(stringBuffers));
}
template <>
VectorPtr readFlatVector<TypeKind::VARCHAR>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
const VectorPtr& dictionary,
memory::MemoryPool* pool) {
return readFlatVectorStringView(buffers, bufferIdx, length, type, dictionary, pool);
}
template <>
VectorPtr readFlatVector<TypeKind::VARBINARY>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
const VectorPtr& dictionary,
memory::MemoryPool* pool) {
return readFlatVectorStringView(buffers, bufferIdx, length, type, dictionary, pool);
}
std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
std::vector<ByteRange> byteRanges;
byteRanges.push_back(ByteRange{data, size, 0});
auto byteStream = std::make_unique<BufferInputStream>(byteRanges);
return byteStream;
}
RowVectorPtr readComplexType(BufferPtr buffer, RowTypePtr& rowType, memory::MemoryPool* pool) {
RowVectorPtr result;
auto byteStream = toByteStream(const_cast<uint8_t*>(buffer->as<uint8_t>()), buffer->size());
auto serde = std::make_unique<serializer::presto::PrestoVectorSerde>();
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.useLosslessTimestamp = true;
serde->deserialize(byteStream.get(), pool, rowType, &result, &options);
return result;
}
RowTypePtr getComplexWriteType(const std::vector<TypePtr>& types) {
std::vector<std::string> complexTypeColNames;
std::vector<TypePtr> complexTypeChildrens;
for (int32_t i = 0; i < types.size(); ++i) {
auto kind = types[i]->kind();
switch (kind) {
case TypeKind::ROW:
case TypeKind::MAP:
case TypeKind::ARRAY: {
complexTypeColNames.emplace_back(types[i]->name());
complexTypeChildrens.emplace_back(types[i]);
} break;
default:
break;
}
}
return std::make_shared<const RowType>(std::move(complexTypeColNames), std::move(complexTypeChildrens));
}
RowVectorPtr deserialize(
RowTypePtr type,
uint32_t numRows,
std::vector<BufferPtr>& buffers,
const std::vector<int32_t>& dictionaryFields,
const std::vector<VectorPtr>& dictionaries,
memory::MemoryPool* pool) {
std::vector<VectorPtr> children;
auto types = type->as<TypeKind::ROW>().children();
std::vector<VectorPtr> complexChildren;
auto complexRowType = getComplexWriteType(types);
if (complexRowType->children().size() > 0) {
complexChildren = readComplexType(buffers[buffers.size() - 1], complexRowType, pool)->children();
}
int32_t bufferIdx = 0;
int32_t complexIdx = 0;
int32_t dictionaryIdx = 0;
for (size_t i = 0; i < types.size(); ++i) {
const auto kind = types[i]->kind();
switch (kind) {
case TypeKind::ROW:
case TypeKind::MAP:
case TypeKind::ARRAY: {
children.emplace_back(std::move(complexChildren[complexIdx]));
complexIdx++;
} break;
default: {
VectorPtr dictionary{nullptr};
if (!dictionaryFields.empty() && dictionaryIdx < dictionaryFields.size() &&
dictionaryFields[dictionaryIdx] == i) {
dictionary = dictionaries[dictionaryIdx++];
}
auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
readFlatVector, kind, buffers, bufferIdx, numRows, types[i], dictionary, pool);
children.emplace_back(std::move(res));
} break;
}
}
return std::make_shared<RowVector>(pool, type, BufferPtr(nullptr), numRows, children);
}
std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
RowTypePtr type,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers,
const std::vector<int32_t>& dictionaryFields,
const std::vector<VectorPtr>& dictionaries,
memory::MemoryPool* pool,
int64_t& deserializeTime) {
ScopedTimer timer(&deserializeTime);
std::vector<BufferPtr> veloxBuffers;
veloxBuffers.reserve(arrowBuffers.size());
for (auto& buffer : arrowBuffers) {
veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
}
auto rowVector = deserialize(type, numRows, veloxBuffers, dictionaryFields, dictionaries, pool);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
arrow::Result<BufferPtr>
readDictionaryBuffer(arrow::io::InputStream* in, facebook::velox::memory::MemoryPool* pool, arrow::util::Codec* codec) {
size_t bufferSize;
ARROW_RETURN_NOT_OK(in->Read(sizeof(bufferSize), &bufferSize));
auto buffer = facebook::velox::AlignedBuffer::allocate<char>(bufferSize, pool, std::nullopt, true);
if (bufferSize == 0) {
return buffer;
}
if (codec != nullptr) {
size_t compressedSize;
ARROW_RETURN_NOT_OK(in->Read(sizeof(compressedSize), &compressedSize));
auto compressedBuffer = facebook::velox::AlignedBuffer::allocate<char>(compressedSize, pool, std::nullopt, true);
ARROW_RETURN_NOT_OK(in->Read(compressedSize, compressedBuffer->asMutable<void>()));
ARROW_ASSIGN_OR_RAISE(
auto decompressedSize,
codec->Decompress(compressedSize, compressedBuffer->as<uint8_t>(), bufferSize, buffer->asMutable<uint8_t>()));
ARROW_RETURN_IF(
decompressedSize != bufferSize,
arrow::Status::IOError(
fmt::format("Decompressed size doesn't equal to original size: ({} vs {})", decompressedSize, bufferSize)));
} else {
ARROW_RETURN_NOT_OK(in->Read(bufferSize, buffer->asMutable<void>()));
}
return buffer;
}
arrow::Result<VectorPtr> readDictionaryForBinary(
arrow::io::InputStream* in,
const TypePtr& type,
facebook::velox::memory::MemoryPool* pool,
arrow::util::Codec* codec) {
// Read length buffer.
ARROW_ASSIGN_OR_RAISE(auto lengthBuffer, readDictionaryBuffer(in, pool, codec));
const auto* lengthBufferPtr = lengthBuffer->as<StringLengthType>();
// Read value buffer.
ARROW_ASSIGN_OR_RAISE(auto valueBuffer, readDictionaryBuffer(in, pool, codec));
const auto* valueBufferPtr = valueBuffer->as<char>();
// Build StringViews.
const auto numElements = lengthBuffer->size() / sizeof(StringLengthType);
auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * numElements, pool, std::nullopt, true);
auto* rawValues = values->asMutable<StringView>();
uint64_t offset = 0;
for (size_t i = 0; i < numElements; ++i) {
rawValues[i] = StringView(valueBufferPtr + offset, lengthBufferPtr[i]);
offset += lengthBufferPtr[i];
}
std::vector<BufferPtr> stringBuffers;
stringBuffers.emplace_back(valueBuffer);
return std::make_shared<FlatVector<StringView>>(
pool, type, BufferPtr(nullptr), numElements, std::move(values), std::move(stringBuffers));
}
template <TypeKind Kind, typename NativeType = typename TypeTraits<Kind>::NativeType>
arrow::Result<VectorPtr> readDictionary(
arrow::io::InputStream* in,
const TypePtr& type,
facebook::velox::memory::MemoryPool* pool,
arrow::util::Codec* codec) {
ARROW_ASSIGN_OR_RAISE(auto buffer, readDictionaryBuffer(in, pool, codec));
const auto numElements = buffer->size() / sizeof(NativeType);
return std::make_shared<FlatVector<NativeType>>(
pool, type, BufferPtr(nullptr), numElements, std::move(buffer), std::vector<BufferPtr>{});
}
template <>
arrow::Result<VectorPtr> readDictionary<TypeKind::VARCHAR>(
arrow::io::InputStream* in,
const TypePtr& type,
facebook::velox::memory::MemoryPool* pool,
arrow::util::Codec* codec) {
return readDictionaryForBinary(in, type, pool, codec);
}
template <>
arrow::Result<VectorPtr> readDictionary<TypeKind::VARBINARY>(
arrow::io::InputStream* in,
const TypePtr& type,
facebook::velox::memory::MemoryPool* pool,
arrow::util::Codec* codec) {
return readDictionaryForBinary(in, type, pool, codec);
}
} // namespace
class VeloxDictionaryReader {
public:
VeloxDictionaryReader(
const facebook::velox::RowTypePtr& rowType,
facebook::velox::memory::MemoryPool* veloxPool,
arrow::util::Codec* codec)
: rowType_(rowType), veloxPool_(veloxPool), codec_(codec) {}
arrow::Result<std::vector<int32_t>> readFields(arrow::io::InputStream* in) const {
// Read bitmap.
auto bitMapSize = arrow::bit_util::RoundUpToMultipleOf8(rowType_->size());
std::vector<uint8_t> bitMap(bitMapSize);
RETURN_NOT_OK(in->Read(bitMapSize, bitMap.data()));
std::vector<int32_t> fields;
for (auto i = 0; i < rowType_->size(); ++i) {
if (arrow::bit_util::GetBit(bitMap.data(), i)) {
fields.push_back(i);
}
}
return fields;
}
arrow::Result<std::vector<VectorPtr>> readDictionaries(arrow::io::InputStream* in, const std::vector<int32_t>& fields)
const {
// Read dictionary buffers.
std::vector<VectorPtr> dictionaries;
for (const auto i : fields) {
auto dictionary = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
readDictionary, rowType_->childAt(i)->kind(), in, rowType_->childAt(i), veloxPool_, codec_);
dictionaries.emplace_back();
ARROW_ASSIGN_OR_RAISE(dictionaries.back(), dictionary);
}
return dictionaries;
}
private:
facebook::velox::RowTypePtr rowType_;
facebook::velox::memory::MemoryPool* veloxPool_;
arrow::util::Codec* codec_;
};
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
const std::shared_ptr<StreamReader>& streamReader,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int64_t readerBufferSize,
VeloxMemoryManager* memoryManager,
int64_t& deserializeTime,
int64_t& decompressTime)
: streamReader_(streamReader),
schema_(schema),
codec_(codec),
rowType_(rowType),
readerBufferSize_(readerBufferSize),
memoryManager_(memoryManager),
deserializeTime_(deserializeTime),
decompressTime_(decompressTime) {}
bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get()));
switch (blockType) {
case BlockType::kEndOfStream:
return false;
case BlockType::kDictionary: {
VeloxDictionaryReader reader(rowType_, memoryManager_->getLeafMemoryPool().get(), codec_.get());
GLUTEN_ASSIGN_OR_THROW(dictionaryFields_, reader.readFields(in_.get()));
GLUTEN_ASSIGN_OR_THROW(dictionaries_, reader.readDictionaries(in_.get(), dictionaryFields_));
GLUTEN_ASSIGN_OR_THROW(blockType, readBlockType(in_.get()));
GLUTEN_CHECK(blockType == BlockType::kDictionaryPayload, "Invalid block type for dictionary payload");
} break;
case BlockType::kDictionaryPayload: {
GLUTEN_CHECK(
!dictionaryFields_.empty() && !dictionaries_.empty(),
"Dictionaries cannot be empty when reading dictionary payload");
} break;
case BlockType::kPlainPayload: {
if (!dictionaryFields_.empty()) {
// Clear previous dictionaries if the next block is a plain payload.
dictionaryFields_.clear();
dictionaries_.clear();
}
} break;
default:
throw GlutenException(fmt::format("Unsupported block type: {}", static_cast<int32_t>(blockType)));
}
return true;
}
void VeloxHashShuffleReaderDeserializer::loadNextStream() {
if (reachedEos_) {
return;
}
auto in = streamReader_->readNextStream(memoryManager_->defaultArrowMemoryPool());
if (in == nullptr) {
reachedEos_ = true;
return;
}
if (readerBufferSize_ > 0) {
GLUTEN_ASSIGN_OR_THROW(
in_,
arrow::io::BufferedInputStream::Create(
readerBufferSize_, memoryManager_->defaultArrowMemoryPool(), std::move(in)));
} else {
in_ = std::move(in);
}
}
std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
if (in_ == nullptr) {
loadNextStream();
if (reachedEos_) {
return nullptr;
}
}
while (!resolveNextBlockType()) {
loadNextStream();
if (reachedEos_) {
return nullptr;
}
}
uint32_t numRows = 0;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers,
BlockPayload::deserialize(
in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), numRows, deserializeTime_, decompressTime_));
return makeColumnarBatch(
rowType_,
numRows,
std::move(arrowBuffers),
dictionaryFields_,
dictionaries_,
memoryManager_->getLeafMemoryPool().get(),
deserializeTime_);
}
VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
const std::shared_ptr<StreamReader>& streamReader,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
int64_t& deserializeTime,
int64_t& decompressTime)
: streamReader_(streamReader),
schema_(schema),
codec_(codec),
rowType_(rowType),
batchSize_(batchSize),
readerBufferSize_(readerBufferSize),
deserializerBufferSize_(deserializerBufferSize),
deserializeTime_(deserializeTime),
decompressTime_(decompressTime),
memoryManager_(memoryManager) {}
VeloxSortShuffleReaderDeserializer::~VeloxSortShuffleReaderDeserializer() {
if (auto in = std::dynamic_pointer_cast<CompressedInputStream>(in_)) {
decompressTime_ += in->decompressTime();
}
}
std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::next() {
if (in_ == nullptr) {
loadNextStream();
}
if (reachedEos_) {
return nullptr;
}
if (rowBuffer_ == nullptr) {
rowBuffer_ = AlignedBuffer::allocate<char>(
deserializerBufferSize_, memoryManager_->getLeafMemoryPool().get(), std::nullopt, true /*allocateExact*/);
rowBufferPtr_ = rowBuffer_->asMutable<char>();
data_.reserve(batchSize_);
}
if (lastRowSize_ != 0) {
if (lastRowSize_ > rowBuffer_->size()) {
reallocateRowBuffer();
}
readNextRow();
}
while (cachedRows_ < batchSize_) {
GLUTEN_ASSIGN_OR_THROW(auto bytes, in_->Read(sizeof(RowSizeType), &lastRowSize_));
while (bytes == 0) {
// Current stream has no more data. Try to load the next stream.
loadNextStream();
if (reachedEos_) {
if (bytesRead_ > 0) {
return deserializeToBatch();
}
// If we reached EOS and have no rows, return nullptr.
return nullptr;
}
GLUTEN_ASSIGN_OR_THROW(bytes, in_->Read(sizeof(RowSizeType), &lastRowSize_));
}
if (lastRowSize_ + bytesRead_ > rowBuffer_->size()) {
if (bytesRead_ > 0) {
// If we have already read some rows, return the current batch.
return deserializeToBatch();
}
reallocateRowBuffer();
}
readNextRow();
}
return deserializeToBatch();
}
std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::deserializeToBatch() {
ScopedTimer timer(&deserializeTime_);
auto rowVector =
facebook::velox::row::CompactRow::deserialize(data_, rowType_, memoryManager_->getLeafMemoryPool().get());
cachedRows_ = 0;
bytesRead_ = 0;
data_.resize(0);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
void VeloxSortShuffleReaderDeserializer::reallocateRowBuffer() {
auto newSize = facebook::velox::bits::nextPowerOfTwo(lastRowSize_);
LOG(WARNING) << "Row size " << lastRowSize_ << " exceeds current buffer size " << rowBuffer_->size()
<< ". Resizing buffer to " << newSize;
rowBuffer_ = AlignedBuffer::allocate<char>(
newSize, memoryManager_->getLeafMemoryPool().get(), std::nullopt, true /*allocateExact*/);
rowBufferPtr_ = rowBuffer_->asMutable<char>();
}
void VeloxSortShuffleReaderDeserializer::loadNextStream() {
if (reachedEos_) {
return;
}
auto in = streamReader_->readNextStream(memoryManager_->defaultArrowMemoryPool());
if (in == nullptr) {
reachedEos_ = true;
return;
}
if (codec_ != nullptr) {
GLUTEN_ASSIGN_OR_THROW(
in_, CompressedInputStream::Make(codec_.get(), std::move(in), memoryManager_->defaultArrowMemoryPool()));
} else {
if (readerBufferSize_ > 0) {
GLUTEN_ASSIGN_OR_THROW(
in_,
arrow::io::BufferedInputStream::Create(
readerBufferSize_, memoryManager_->defaultArrowMemoryPool(), std::move(in)));
} else {
in_ = std::move(in);
}
}
}
void VeloxSortShuffleReaderDeserializer::readNextRow() {
GLUTEN_THROW_NOT_OK(in_->Read(lastRowSize_, rowBufferPtr_ + bytesRead_));
data_.push_back(std::string_view(rowBufferPtr_ + bytesRead_, lastRowSize_));
bytesRead_ += lastRowSize_;
lastRowSize_ = 0;
++cachedRows_;
}
class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::GlutenByteInputStream {
public:
VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input, facebook::velox::BufferPtr buffer);
bool hasNext();
void next(bool throwIfPastEnd) override;
size_t remainingSize() const override;
std::shared_ptr<arrow::io::InputStream> in_;
const facebook::velox::BufferPtr buffer_;
uint64_t offset_ = -1;
};
VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::VeloxInputStream(
std::shared_ptr<arrow::io::InputStream> input,
facebook::velox::BufferPtr buffer)
: in_(std::move(input)), buffer_(std::move(buffer)) {
next(true);
}
bool VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::hasNext() {
if (offset_ == 0) {
return false;
}
if (ranges()[0].position >= ranges()[0].size) {
next(true);
return offset_ != 0;
}
return true;
}
void VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::next(bool throwIfPastEnd) {
const uint32_t readBytes = buffer_->capacity();
offset_ = in_->Read(readBytes, buffer_->asMutable<char>()).ValueOr(0);
if (offset_ > 0) {
int32_t realBytes = offset_;
VELOX_CHECK_LT(0, realBytes, "Reading past end of file.");
setRange({buffer_->asMutable<uint8_t>(), realBytes, 0});
}
}
VeloxRssSortShuffleReaderDeserializer::VeloxRssSortShuffleReaderDeserializer(
const std::shared_ptr<StreamReader>& streamReader,
VeloxMemoryManager* memoryManager,
const RowTypePtr& rowType,
int32_t batchSize,
facebook::velox::common::CompressionKind veloxCompressionType,
int64_t& deserializeTime)
: streamReader_(streamReader),
memoryManager_(memoryManager),
rowType_(rowType),
batchSize_(batchSize),
veloxCompressionType_(veloxCompressionType),
serde_(getNamedVectorSerde("Presto")),
deserializeTime_(deserializeTime) {
serdeOptions_ = {false, veloxCompressionType_};
}
std::shared_ptr<ColumnarBatch> VeloxRssSortShuffleReaderDeserializer::next() {
if (in_ == nullptr || !in_->hasNext()) {
do {
loadNextStream();
if (reachedEos_) {
return nullptr;
}
} while (!in_->hasNext());
}
ScopedTimer timer(&deserializeTime_);
RowVectorPtr rowVector;
VectorStreamGroup::read(
in_.get(), memoryManager_->getLeafMemoryPool().get(), rowType_, serde_, &rowVector, &serdeOptions_);
if (rowVector->size() >= batchSize_) {
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
while (rowVector->size() < batchSize_ && in_->hasNext()) {
RowVectorPtr rowVectorTemp;
VectorStreamGroup::read(
in_.get(), memoryManager_->getLeafMemoryPool().get(), rowType_, serde_, &rowVectorTemp, &serdeOptions_);
rowVector->append(rowVectorTemp.get());
}
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
void VeloxRssSortShuffleReaderDeserializer::loadNextStream() {
if (reachedEos_) {
return;
}
arrowIn_ = streamReader_->readNextStream(memoryManager_->defaultArrowMemoryPool());
if (arrowIn_ == nullptr) {
reachedEos_ = true;
return;
}
constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize;
auto buffer = AlignedBuffer::allocate<char>(kMaxReadBufferSize, memoryManager_->getLeafMemoryPool().get());
in_ = std::make_unique<VeloxInputStream>(std::move(arrowIn_), std::move(buffer));
}
size_t VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::remainingSize() const {
return std::numeric_limits<unsigned long>::max();
}
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
facebook::velox::common::CompressionKind veloxCompressionType,
const RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
ShuffleWriterType shuffleWriterType)
: schema_(schema),
codec_(codec),
veloxCompressionType_(veloxCompressionType),
rowType_(rowType),
batchSize_(batchSize),
readerBufferSize_(readerBufferSize),
deserializerBufferSize_(deserializerBufferSize),
memoryManager_(memoryManager),
shuffleWriterType_(shuffleWriterType) {
initFromSchema();
}
std::unique_ptr<ColumnarBatchIterator> VeloxShuffleReaderDeserializerFactory::createDeserializer(
const std::shared_ptr<StreamReader>& streamReader) {
switch (shuffleWriterType_) {
case ShuffleWriterType::kGpuHashShuffle:
#ifdef GLUTEN_ENABLE_GPU
VELOX_CHECK(!hasComplexType_);
return std::make_unique<VeloxGpuHashShuffleReaderDeserializer>(
streamReader,
schema_,
codec_,
rowType_,
readerBufferSize_,
memoryManager_,
deserializeTime_,
decompressTime_);
#endif
case ShuffleWriterType::kHashShuffle:
return std::make_unique<VeloxHashShuffleReaderDeserializer>(
streamReader,
schema_,
codec_,
rowType_,
readerBufferSize_,
memoryManager_,
deserializeTime_,
decompressTime_);
case ShuffleWriterType::kSortShuffle:
return std::make_unique<VeloxSortShuffleReaderDeserializer>(
streamReader,
schema_,
codec_,
rowType_,
batchSize_,
readerBufferSize_,
deserializerBufferSize_,
memoryManager_,
deserializeTime_,
decompressTime_);
case ShuffleWriterType::kRssSortShuffle:
return std::make_unique<VeloxRssSortShuffleReaderDeserializer>(
streamReader, memoryManager_, rowType_, batchSize_, veloxCompressionType_, deserializeTime_);
}
GLUTEN_UNREACHABLE();
}
int64_t VeloxShuffleReaderDeserializerFactory::getDecompressTime() {
return decompressTime_;
}
int64_t VeloxShuffleReaderDeserializerFactory::getDeserializeTime() {
return deserializeTime_;
}
void VeloxShuffleReaderDeserializerFactory::initFromSchema() {
GLUTEN_ASSIGN_OR_THROW(auto arrowColumnTypes, toShuffleTypeId(schema_->fields()));
isValidityBuffer_.reserve(arrowColumnTypes.size());
for (size_t i = 0; i < arrowColumnTypes.size(); ++i) {
switch (arrowColumnTypes[i]->id()) {
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
isValidityBuffer_.push_back(true);
isValidityBuffer_.push_back(false);
isValidityBuffer_.push_back(false);
} break;
case arrow::StructType::type_id:
case arrow::MapType::type_id:
case arrow::ListType::type_id: {
hasComplexType_ = true;
} break;
case arrow::BooleanType::type_id: {
isValidityBuffer_.push_back(true);
isValidityBuffer_.push_back(true);
} break;
case arrow::NullType::type_id:
break;
default: {
isValidityBuffer_.push_back(true);
isValidityBuffer_.push_back(false);
} break;
}
}
}
VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr<VeloxShuffleReaderDeserializerFactory> factory)
: factory_(std::move(factory)) {}
std::shared_ptr<ResultIterator> VeloxShuffleReader::read(const std::shared_ptr<StreamReader>& streamReader) {
return std::make_shared<ResultIterator>(factory_->createDeserializer(streamReader));
}
int64_t VeloxShuffleReader::getDecompressTime() const {
return factory_->getDecompressTime();
}
int64_t VeloxShuffleReader::getDeserializeTime() const {
return factory_->getDeserializeTime();
}
} // namespace gluten