blob: 85cdcf82a1a48c10376f1263086a4832c237d456 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "operators/reader/ParquetReaderIterator.h"
#include "memory/VeloxColumnarBatch.h"
#include "utils/Timer.h"
namespace gluten {
namespace {
std::unique_ptr<facebook::velox::parquet::ParquetReader> createReader(
const std::string& path,
const facebook::velox::dwio::common::ReaderOptions& opts) {
auto input = std::make_unique<facebook::velox::dwio::common::BufferedInput>(
std::make_shared<facebook::velox::LocalReadFile>(path), opts.memoryPool());
return std::make_unique<facebook::velox::parquet::ParquetReader>(std::move(input), opts);
}
std::shared_ptr<facebook::velox::common::ScanSpec> makeScanSpec(const facebook::velox::RowTypePtr& rowType) {
auto scanSpec = std::make_shared<facebook::velox::common::ScanSpec>("");
scanSpec->addAllChildFields(*rowType);
return scanSpec;
}
} // namespace
ParquetReaderIterator::ParquetReaderIterator(
const std::string& path,
int64_t batchSize,
std::shared_ptr<facebook::velox::memory::MemoryPool> pool)
: FileReaderIterator(path), pool_(std::move(pool)), batchSize_(batchSize) {}
void ParquetReaderIterator::createRowReader() {
facebook::velox::dwio::common::ReaderOptions readerOptions{pool_.get()};
auto reader = createReader(path_, readerOptions);
rowType_ = reader->rowType();
facebook::velox::dwio::common::RowReaderOptions rowReaderOpts;
rowReaderOpts.select(std::make_shared<facebook::velox::dwio::common::ColumnSelector>(rowType_, rowType_->names()));
rowReaderOpts.setScanSpec(makeScanSpec(rowType_));
rowReader_ = reader->createRowReader(rowReaderOpts);
DLOG(INFO) << "Opened file for read: " << path_;
}
ParquetStreamReaderIterator::ParquetStreamReaderIterator(
const std::string& path,
int64_t batchSize,
std::shared_ptr<facebook::velox::memory::MemoryPool> pool)
: ParquetReaderIterator(path, batchSize, std::move(pool)) {
createRowReader();
}
std::shared_ptr<gluten::ColumnarBatch> ParquetStreamReaderIterator::next() {
ScopedTimer timer(&collectBatchTime_);
static constexpr int32_t kBatchSize = 4096;
auto result = facebook::velox::BaseVector::create(rowType_, kBatchSize, pool_.get());
auto numRows = rowReader_->next(kBatchSize, result);
if (numRows == 0) {
return nullptr;
}
// Load lazy vector.
result = facebook::velox::BaseVector::loadedVectorShared(result);
auto rowVector = std::dynamic_pointer_cast<facebook::velox::RowVector>(result);
GLUTEN_DCHECK(rowVector != nullptr, "Error casting to RowVector");
DLOG(INFO) << "ParquetStreamReaderIterator read rows: " << numRows;
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
ParquetBufferedReaderIterator::ParquetBufferedReaderIterator(
const std::string& path,
int64_t batchSize,
std::shared_ptr<facebook::velox::memory::MemoryPool> pool)
: ParquetReaderIterator(path, batchSize, std::move(pool)) {
createRowReader();
collectBatches();
}
std::shared_ptr<gluten::ColumnarBatch> ParquetBufferedReaderIterator::next() {
if (iter_ == batches_.cend()) {
return nullptr;
}
return *iter_++;
}
void ParquetBufferedReaderIterator::collectBatches() {
ScopedTimer timer(&collectBatchTime_);
static constexpr int32_t kBatchSize = 4096;
uint64_t numRows = 0;
while (true) {
auto result = facebook::velox::BaseVector::create(rowType_, kBatchSize, pool_.get());
numRows = rowReader_->next(kBatchSize, result);
if (numRows == 0) {
break;
}
// Load lazy vector.
result = facebook::velox::BaseVector::loadedVectorShared(result);
auto rowVector = std::dynamic_pointer_cast<facebook::velox::RowVector>(result);
GLUTEN_DCHECK(rowVector != nullptr, "Error casting to RowVector");
DLOG(INFO) << "ParquetStreamReaderIterator read rows: " << numRows;
batches_.push_back(std::make_shared<VeloxColumnarBatch>(std::move(rowVector)));
}
iter_ = batches_.begin();
DLOG(INFO) << "Number of input batches: " << std::to_string(batches_.size());
}
} // namespace gluten