| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "arrow/dataset/file_ipc.h" |
| |
| #include <algorithm> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "arrow/dataset/dataset_internal.h" |
| #include "arrow/dataset/file_base.h" |
| #include "arrow/dataset/scanner.h" |
| #include "arrow/ipc/reader.h" |
| #include "arrow/ipc/writer.h" |
| #include "arrow/util/iterator.h" |
| |
| namespace arrow { |
| namespace dataset { |
| |
| static ipc::IpcReadOptions default_read_options() { |
| auto options = ipc::IpcReadOptions::Defaults(); |
| options.use_threads = false; |
| return options; |
| } |
| |
| Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader( |
| const FileSource& source, |
| const ipc::IpcReadOptions& options = default_read_options()) { |
| ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); |
| |
| std::shared_ptr<ipc::RecordBatchFileReader> reader; |
| |
| auto status = |
| ipc::RecordBatchFileReader::Open(std::move(input), options).Value(&reader); |
| if (!status.ok()) { |
| return status.WithMessage("Could not open IPC input source '", source.path(), |
| "': ", status.message()); |
| } |
| return reader; |
| } |
| |
| Result<std::vector<int>> GetIncludedFields( |
| const Schema& schema, const std::vector<std::string>& materialized_fields) { |
| std::vector<int> included_fields; |
| |
| for (FieldRef ref : materialized_fields) { |
| ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema)); |
| if (match.indices().empty()) continue; |
| |
| included_fields.push_back(match.indices()[0]); |
| } |
| |
| return included_fields; |
| } |
| |
| /// \brief A ScanTask backed by an Ipc file. |
| class IpcScanTask : public ScanTask { |
| public: |
| IpcScanTask(FileSource source, std::shared_ptr<ScanOptions> options, |
| std::shared_ptr<ScanContext> context) |
| : ScanTask(std::move(options), std::move(context)), source_(std::move(source)) {} |
| |
| Result<RecordBatchIterator> Execute() override { |
| struct Impl { |
| static Result<RecordBatchIterator> Make( |
| const FileSource& source, std::vector<std::string> materialized_fields, |
| MemoryPool* pool) { |
| ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source)); |
| |
| auto options = default_read_options(); |
| ARROW_ASSIGN_OR_RAISE(options.included_fields, |
| GetIncludedFields(*reader->schema(), materialized_fields)); |
| |
| ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options)); |
| return RecordBatchIterator(Impl{std::move(reader), pool, 0}); |
| } |
| |
| Result<std::shared_ptr<RecordBatch>> Next() { |
| if (i_ == reader_->num_record_batches()) { |
| return nullptr; |
| } |
| |
| return reader_->ReadRecordBatch(i_++); |
| } |
| |
| std::shared_ptr<ipc::RecordBatchFileReader> reader_; |
| MemoryPool* pool_; |
| int i_; |
| }; |
| |
| return Impl::Make(source_, options_->MaterializedFields(), context_->pool); |
| } |
| |
| private: |
| FileSource source_; |
| }; |
| |
| class IpcScanTaskIterator { |
| public: |
| static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options, |
| std::shared_ptr<ScanContext> context, |
| FileSource source) { |
| return ScanTaskIterator( |
| IpcScanTaskIterator(std::move(options), std::move(context), std::move(source))); |
| } |
| |
| Result<std::shared_ptr<ScanTask>> Next() { |
| if (once_) { |
| // Iteration is done. |
| return nullptr; |
| } |
| |
| once_ = true; |
| return std::shared_ptr<ScanTask>(new IpcScanTask(source_, options_, context_)); |
| } |
| |
| private: |
| IpcScanTaskIterator(std::shared_ptr<ScanOptions> options, |
| std::shared_ptr<ScanContext> context, FileSource source) |
| : options_(std::move(options)), |
| context_(std::move(context)), |
| source_(std::move(source)) {} |
| |
| bool once_ = false; |
| std::shared_ptr<ScanOptions> options_; |
| std::shared_ptr<ScanContext> context_; |
| FileSource source_; |
| }; |
| |
| Result<bool> IpcFileFormat::IsSupported(const FileSource& source) const { |
| RETURN_NOT_OK(source.Open().status()); |
| return OpenReader(source).ok(); |
| } |
| |
| Result<std::shared_ptr<Schema>> IpcFileFormat::Inspect(const FileSource& source) const { |
| ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source)); |
| return reader->schema(); |
| } |
| |
| Result<ScanTaskIterator> IpcFileFormat::ScanFile( |
| const FileSource& source, std::shared_ptr<ScanOptions> options, |
| std::shared_ptr<ScanContext> context) const { |
| return IpcScanTaskIterator::Make(options, context, source); |
| } |
| |
| Result<std::shared_ptr<WriteTask>> IpcFileFormat::WriteFragment( |
| FileSource destination, std::shared_ptr<Fragment> fragment, |
| std::shared_ptr<ScanContext> scan_context) { |
| struct Task : WriteTask { |
| Task(FileSource destination, std::shared_ptr<FileFormat> format, |
| std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanContext> scan_context) |
| : WriteTask(std::move(destination), std::move(format)), |
| fragment_(std::move(fragment)), |
| scan_context_(std::move(scan_context)) {} |
| |
| Status Execute() override { |
| RETURN_NOT_OK(CreateDestinationParentDir()); |
| |
| ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable()); |
| ARROW_ASSIGN_OR_RAISE(auto writer, |
| ipc::NewFileWriter(out_stream.get(), fragment_->schema())); |
| ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment_->Scan(scan_context_)); |
| |
| for (auto maybe_scan_task : scan_task_it) { |
| ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); |
| |
| ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); |
| |
| for (auto maybe_batch : batch_it) { |
| ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); |
| RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); |
| } |
| } |
| |
| return writer->Close(); |
| } |
| |
| std::shared_ptr<Fragment> fragment_; |
| std::shared_ptr<ScanContext> scan_context_; |
| }; |
| |
| return std::make_shared<Task>(std::move(destination), shared_from_this(), |
| std::move(fragment), std::move(scan_context)); |
| } |
| |
| } // namespace dataset |
| } // namespace arrow |