blob: 26a1634f4d6dcd207c99b762cccf77a05dc1a116 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "shuffle/Payload.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/VeloxSortShuffleWriter.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
namespace gluten {
class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator {
public:
VeloxHashShuffleReaderDeserializer(
const std::shared_ptr<StreamReader>& streamReader,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
VeloxMemoryManager* memoryManager,
std::vector<bool>* isValidityBuffer,
bool hasComplexType,
int64_t& deserializeTime,
int64_t& decompressTime);
std::shared_ptr<ColumnarBatch> next() override;
private:
bool resolveNextBlockType();
void loadNextStream();
std::shared_ptr<StreamReader> streamReader_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
int32_t batchSize_;
int64_t readerBufferSize_;
VeloxMemoryManager* memoryManager_;
std::vector<bool>* isValidityBuffer_;
bool hasComplexType_;
int64_t& deserializeTime_;
int64_t& decompressTime_;
std::shared_ptr<arrow::io::InputStream> in_{nullptr};
bool reachedEos_{false};
bool blockTypeResolved_{false};
std::vector<int32_t> dictionaryFields_{};
std::vector<facebook::velox::VectorPtr> dictionaries_{};
};
class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
public:
using RowSizeType = VeloxSortShuffleWriter::RowSizeType;
VeloxSortShuffleReaderDeserializer(
const std::shared_ptr<StreamReader>& streamReader,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
int64_t& deserializeTime,
int64_t& decompressTime);
~VeloxSortShuffleReaderDeserializer() override;
std::shared_ptr<ColumnarBatch> next() override;
private:
std::shared_ptr<ColumnarBatch> deserializeToBatch();
void readNextRow();
void reallocateRowBuffer();
void loadNextStream();
std::shared_ptr<StreamReader> streamReader_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::RowTypePtr rowType_;
uint32_t batchSize_;
int64_t readerBufferSize_;
int64_t deserializerBufferSize_;
int64_t& deserializeTime_;
int64_t& decompressTime_;
VeloxMemoryManager* memoryManager_;
facebook::velox::BufferPtr rowBuffer_{nullptr};
char* rowBufferPtr_{nullptr};
uint32_t bytesRead_{0};
uint32_t lastRowSize_{0};
std::vector<std::string_view> data_;
std::shared_ptr<arrow::io::InputStream> in_{nullptr};
uint32_t cachedRows_{0};
bool reachedEos_{false};
};
class VeloxRssSortShuffleReaderDeserializer : public ColumnarBatchIterator {
public:
VeloxRssSortShuffleReaderDeserializer(
const std::shared_ptr<StreamReader>& streamReader,
VeloxMemoryManager* memoryManager,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
facebook::velox::common::CompressionKind veloxCompressionType,
int64_t& deserializeTime);
std::shared_ptr<ColumnarBatch> next();
private:
class VeloxInputStream;
void loadNextStream();
std::shared_ptr<StreamReader> streamReader_;
VeloxMemoryManager* memoryManager_;
facebook::velox::RowTypePtr rowType_;
std::vector<facebook::velox::RowVectorPtr> batches_;
int32_t batchSize_;
facebook::velox::common::CompressionKind veloxCompressionType_;
facebook::velox::VectorSerde* const serde_;
facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_;
int64_t& deserializeTime_;
std::shared_ptr<VeloxInputStream> in_{nullptr};
std::shared_ptr<arrow::io::InputStream> arrowIn_{nullptr};
bool reachedEos_{false};
};
class VeloxShuffleReaderDeserializerFactory {
public:
VeloxShuffleReaderDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
facebook::velox::common::CompressionKind veloxCompressionType,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
ShuffleWriterType shuffleWriterType);
std::unique_ptr<ColumnarBatchIterator> createDeserializer(const std::shared_ptr<StreamReader>& streamReader);
int64_t getDecompressTime();
int64_t getDeserializeTime();
private:
void initFromSchema();
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::common::CompressionKind veloxCompressionType_;
facebook::velox::RowTypePtr rowType_;
int32_t batchSize_;
int64_t readerBufferSize_;
int64_t deserializerBufferSize_;
VeloxMemoryManager* memoryManager_;
std::vector<bool> isValidityBuffer_;
bool hasComplexType_{false};
ShuffleWriterType shuffleWriterType_;
int64_t deserializeTime_{0};
int64_t decompressTime_{0};
};
class VeloxShuffleReader final : public ShuffleReader {
public:
VeloxShuffleReader(std::unique_ptr<VeloxShuffleReaderDeserializerFactory> factory);
std::shared_ptr<ResultIterator> read(const std::shared_ptr<StreamReader>& streamReader) override;
int64_t getDecompressTime() const override;
int64_t getDeserializeTime() const override;
private:
std::unique_ptr<VeloxShuffleReaderDeserializerFactory> factory_;
};
} // namespace gluten