blob: 2df34145cd9e0063f92af2ba32f28181353c576f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/dataset/dataset.h"
#include <memory>
#include <utility>
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/scanner.h"
#include "arrow/table.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
namespace arrow {
namespace dataset {
Fragment::Fragment(Expression partition_expression,
std::shared_ptr<Schema> physical_schema)
: partition_expression_(std::move(partition_expression)),
physical_schema_(std::move(physical_schema)) {}
Result<std::shared_ptr<Schema>> Fragment::ReadPhysicalSchema() {
{
auto lock = physical_schema_mutex_.Lock();
if (physical_schema_ != nullptr) return physical_schema_;
}
// allow ReadPhysicalSchemaImpl to lock mutex_, if necessary
ARROW_ASSIGN_OR_RAISE(auto physical_schema, ReadPhysicalSchemaImpl());
auto lock = physical_schema_mutex_.Lock();
if (physical_schema_ == nullptr) {
physical_schema_ = std::move(physical_schema);
}
return physical_schema_;
}
Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchemaImpl() {
return physical_schema_;
}
InMemoryFragment::InMemoryFragment(std::shared_ptr<Schema> schema,
RecordBatchVector record_batches,
Expression partition_expression)
: Fragment(std::move(partition_expression), std::move(schema)),
record_batches_(std::move(record_batches)) {
DCHECK_NE(physical_schema_, nullptr);
}
InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
Expression partition_expression)
: Fragment(std::move(partition_expression), /*schema=*/nullptr),
record_batches_(std::move(record_batches)) {
// Order of argument evaluation is undefined, so compute physical_schema here
physical_schema_ = record_batches_.empty() ? schema({}) : record_batches_[0]->schema();
}
Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> options) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto batches_it = MakeVectorIterator(record_batches_);
auto batch_size = options->batch_size;
// RecordBatch -> ScanTask
auto self = shared_from_this();
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
RecordBatchVector batches;
auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size);
for (int i = 0; i < n_batches; i++) {
batches.push_back(batch->Slice(batch_size * i, batch_size));
}
return ::arrow::internal::make_unique<InMemoryScanTask>(std::move(batches),
std::move(options), self);
};
return MakeMapIterator(fn, std::move(batches_it));
}
Dataset::Dataset(std::shared_ptr<Schema> schema, Expression partition_expression)
: schema_(std::move(schema)),
partition_expression_(std::move(partition_expression)) {}
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan(
std::shared_ptr<ScanOptions> options) {
return std::make_shared<ScannerBuilder>(this->shared_from_this(), options);
}
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanOptions>());
}
Result<FragmentIterator> Dataset::GetFragments() {
ARROW_ASSIGN_OR_RAISE(auto predicate, literal(true).Bind(*schema_));
return GetFragments(std::move(predicate));
}
Result<FragmentIterator> Dataset::GetFragments(Expression predicate) {
ARROW_ASSIGN_OR_RAISE(
predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_));
return predicate.IsSatisfiable() ? GetFragmentsImpl(std::move(predicate))
: MakeEmptyIterator<std::shared_ptr<Fragment>>();
}
struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(RecordBatchVector batches)
: batches_(std::move(batches)) {}
RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); }
RecordBatchVector batches_;
};
InMemoryDataset::InMemoryDataset(std::shared_ptr<Schema> schema,
RecordBatchVector batches)
: Dataset(std::move(schema)),
get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}
struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
: table_(std::move(table)) {}
RecordBatchIterator Get() const final {
auto reader = std::make_shared<TableBatchReader>(*table_);
auto table = table_;
return MakeFunctionIterator([reader, table] { return reader->Next(); });
}
std::shared_ptr<Table> table_;
};
InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
: Dataset(table->schema()),
get_batches_(new TableRecordBatchGenerator(std::move(table))) {}
struct ReaderRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit ReaderRecordBatchGenerator(std::shared_ptr<RecordBatchReader> reader)
: reader_(std::move(reader)), consumed_(false) {}
RecordBatchIterator Get() const final {
if (consumed_) {
return MakeErrorIterator<std::shared_ptr<RecordBatch>>(Status::Invalid(
"RecordBatchReader-backed InMemoryDataset was already consumed"));
}
consumed_ = true;
auto reader = reader_;
return MakeFunctionIterator([reader] { return reader->Next(); });
}
std::shared_ptr<RecordBatchReader> reader_;
mutable bool consumed_;
};
InMemoryDataset::InMemoryDataset(std::shared_ptr<RecordBatchReader> reader)
: Dataset(reader->schema()),
get_batches_(new ReaderRecordBatchGenerator(std::move(reader))) {}
Result<std::shared_ptr<Dataset>> InMemoryDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::make_shared<InMemoryDataset>(std::move(schema), get_batches_);
}
Result<FragmentIterator> InMemoryDataset::GetFragmentsImpl(Expression) {
auto schema = this->schema();
auto create_fragment =
[schema](std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
if (!batch->schema()->Equals(schema)) {
return Status::TypeError("yielded batch had schema ", *batch->schema(),
" which did not match InMemorySource's: ", *schema);
}
RecordBatchVector batches{batch};
return std::make_shared<InMemoryFragment>(std::move(batches));
};
return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
}
Result<std::shared_ptr<UnionDataset>> UnionDataset::Make(std::shared_ptr<Schema> schema,
DatasetVector children) {
for (const auto& child : children) {
if (!child->schema()->Equals(*schema)) {
return Status::TypeError("child Dataset had schema ", *child->schema(),
" but the union schema was ", *schema);
}
}
return std::shared_ptr<UnionDataset>(
new UnionDataset(std::move(schema), std::move(children)));
}
Result<std::shared_ptr<Dataset>> UnionDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
auto children = children_;
for (auto& child : children) {
ARROW_ASSIGN_OR_RAISE(child, child->ReplaceSchema(schema));
}
return std::shared_ptr<Dataset>(
new UnionDataset(std::move(schema), std::move(children)));
}
Result<FragmentIterator> UnionDataset::GetFragmentsImpl(Expression predicate) {
return GetFragmentsFromDatasets(children_, predicate);
}
} // namespace dataset
} // namespace arrow