| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "arrow/dataset/file_orc.h" |
| |
| #include <memory> |
| |
| #include "arrow/adapters/orc/adapter.h" |
| #include "arrow/dataset/dataset_internal.h" |
| #include "arrow/dataset/file_base.h" |
| #include "arrow/dataset/scanner.h" |
| #include "arrow/util/checked_cast.h" |
| #include "arrow/util/future.h" |
| #include "arrow/util/iterator.h" |
| #include "arrow/util/logging.h" |
| #include "arrow/util/thread_pool.h" |
| |
| namespace arrow { |
| |
| using internal::checked_pointer_cast; |
| |
| namespace dataset { |
| |
| namespace { |
| |
| Result<std::unique_ptr<arrow::adapters::orc::ORCFileReader>> OpenORCReader( |
| const FileSource& source, |
| const std::shared_ptr<ScanOptions>& scan_options = nullptr) { |
| ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); |
| |
| arrow::MemoryPool* pool; |
| if (scan_options) { |
| pool = scan_options->pool; |
| } else { |
| pool = default_memory_pool(); |
| } |
| |
| auto reader = arrow::adapters::orc::ORCFileReader::Open(std::move(input), pool); |
| auto status = reader.status(); |
| if (!status.ok()) { |
| return status.WithMessage("Could not open ORC input source '", source.path(), |
| "': ", status.message()); |
| } |
| return reader; |
| } |
| |
| /// \brief A ScanTask backed by an ORC file. |
| class OrcScanTask { |
| public: |
| OrcScanTask(std::shared_ptr<FileFragment> fragment, |
| std::shared_ptr<ScanOptions> options) |
| : fragment_(std::move(fragment)), options_(std::move(options)) {} |
| |
| Result<RecordBatchIterator> Execute() { |
| struct Impl { |
| static Result<RecordBatchIterator> Make(const FileSource& source, |
| const FileFormat& format, |
| const ScanOptions& scan_options) { |
| ARROW_ASSIGN_OR_RAISE( |
| auto reader, |
| OpenORCReader(source, std::make_shared<ScanOptions>(scan_options))); |
| |
| auto materialized_fields = scan_options.MaterializedFields(); |
| // filter out virtual columns |
| std::vector<std::string> included_fields; |
| ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema()); |
| for (const auto& ref : materialized_fields) { |
| ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema)); |
| if (match.indices().empty()) continue; |
| |
| included_fields.push_back(schema->field(match.indices()[0])->name()); |
| } |
| |
| std::shared_ptr<RecordBatchReader> record_batch_reader; |
| ARROW_ASSIGN_OR_RAISE( |
| record_batch_reader, |
| reader->GetRecordBatchReader(scan_options.batch_size, included_fields)); |
| |
| return RecordBatchIterator(Impl{std::move(record_batch_reader)}); |
| } |
| |
| Result<std::shared_ptr<RecordBatch>> Next() { |
| std::shared_ptr<RecordBatch> batch; |
| RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch)); |
| return batch; |
| } |
| |
| std::shared_ptr<RecordBatchReader> record_batch_reader_; |
| }; |
| |
| return Impl::Make(fragment_->source(), |
| *checked_pointer_cast<FileFragment>(fragment_)->format(), |
| *options_); |
| } |
| |
| private: |
| std::shared_ptr<FileFragment> fragment_; |
| std::shared_ptr<ScanOptions> options_; |
| }; |
| |
| class OrcScanTaskIterator { |
| public: |
| static Result<Iterator<std::shared_ptr<OrcScanTask>>> Make( |
| std::shared_ptr<ScanOptions> options, std::shared_ptr<FileFragment> fragment) { |
| return Iterator<std::shared_ptr<OrcScanTask>>( |
| OrcScanTaskIterator(std::move(options), std::move(fragment))); |
| } |
| |
| Result<std::shared_ptr<OrcScanTask>> Next() { |
| if (once_) { |
| // Iteration is done. |
| return nullptr; |
| } |
| |
| once_ = true; |
| return std::make_shared<OrcScanTask>(fragment_, options_); |
| } |
| |
| private: |
| OrcScanTaskIterator(std::shared_ptr<ScanOptions> options, |
| std::shared_ptr<FileFragment> fragment) |
| : options_(std::move(options)), fragment_(std::move(fragment)) {} |
| |
| bool once_ = false; |
| std::shared_ptr<ScanOptions> options_; |
| std::shared_ptr<FileFragment> fragment_; |
| }; |
| |
| } // namespace |
| |
| OrcFileFormat::OrcFileFormat() : FileFormat(/*default_fragment_scan_options=*/nullptr) {} |
| |
| Result<bool> OrcFileFormat::IsSupported(const FileSource& source) const { |
| RETURN_NOT_OK(source.Open().status()); |
| return OpenORCReader(source).ok(); |
| } |
| |
| Result<std::shared_ptr<Schema>> OrcFileFormat::Inspect(const FileSource& source) const { |
| ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source)); |
| return reader->ReadSchema(); |
| } |
| |
| Result<RecordBatchGenerator> OrcFileFormat::ScanBatchesAsync( |
| const std::shared_ptr<ScanOptions>& options, |
| const std::shared_ptr<FileFragment>& file) const { |
| // TODO investigate "true" async version |
| // (https://issues.apache.org/jira/browse/ARROW-13795) |
| ARROW_ASSIGN_OR_RAISE(auto task_iter, OrcScanTaskIterator::Make(options, file)); |
| struct IterState { |
| Iterator<std::shared_ptr<OrcScanTask>> iter; |
| RecordBatchIterator curr_iter; |
| bool first; |
| ::arrow::internal::Executor* io_executor; |
| }; |
| struct { |
| Future<std::shared_ptr<RecordBatch>> operator()() { |
| auto state = state_; |
| return ::arrow::DeferNotOk( |
| state->io_executor->Submit([state]() -> Result<std::shared_ptr<RecordBatch>> { |
| if (state->first) { |
| ARROW_ASSIGN_OR_RAISE(auto task, state->iter.Next()); |
| ARROW_ASSIGN_OR_RAISE(state->curr_iter, task->Execute()); |
| state->first = false; |
| } |
| while (!IsIterationEnd(state->curr_iter)) { |
| ARROW_ASSIGN_OR_RAISE(auto next_batch, state->curr_iter.Next()); |
| if (IsIterationEnd(next_batch)) { |
| ARROW_ASSIGN_OR_RAISE(auto task, state->iter.Next()); |
| if (IsIterationEnd(task)) { |
| state->curr_iter = IterationEnd<RecordBatchIterator>(); |
| } else { |
| ARROW_ASSIGN_OR_RAISE(state->curr_iter, task->Execute()); |
| } |
| } else { |
| return next_batch; |
| } |
| } |
| return IterationEnd<std::shared_ptr<RecordBatch>>(); |
| })); |
| } |
| std::shared_ptr<IterState> state_; |
| } iter_to_gen{std::shared_ptr<IterState>( |
| new IterState{std::move(task_iter), {}, true, options->io_context.executor()})}; |
| return iter_to_gen; |
| } |
| |
| Future<std::optional<int64_t>> OrcFileFormat::CountRows( |
| const std::shared_ptr<FileFragment>& file, compute::Expression predicate, |
| const std::shared_ptr<ScanOptions>& options) { |
| if (ExpressionHasFieldRefs(predicate)) { |
| return Future<std::optional<int64_t>>::MakeFinished(std::nullopt); |
| } |
| auto self = checked_pointer_cast<OrcFileFormat>(shared_from_this()); |
| return DeferNotOk(options->io_context.executor()->Submit( |
| [self, file]() -> Result<std::optional<int64_t>> { |
| ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(file->source())); |
| return reader->NumberOfRows(); |
| })); |
| } |
| |
| // // |
| // // OrcFileWriter, OrcFileWriteOptions |
| // // |
| |
| std::shared_ptr<FileWriteOptions> OrcFileFormat::DefaultWriteOptions() { |
| // TODO (https://issues.apache.org/jira/browse/ARROW-13796) |
| return nullptr; |
| } |
| |
| Result<std::shared_ptr<FileWriter>> OrcFileFormat::MakeWriter( |
| std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema, |
| std::shared_ptr<FileWriteOptions> options, |
| fs::FileLocator destination_locator) const { |
| // TODO (https://issues.apache.org/jira/browse/ARROW-13796) |
| return Status::NotImplemented("ORC writer not yet implemented."); |
| } |
| |
| } // namespace dataset |
| } // namespace arrow |