| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "./arrow_types.h" |
| |
| #if defined(ARROW_R_WITH_ACERO) |
| |
| #include "./safe-call-into-r.h" |
| |
| #include <arrow/acero/exec_plan.h> |
| #include <arrow/buffer.h> |
| #include <arrow/compute/api.h> |
| #include <arrow/compute/expression.h> |
| #include <arrow/table.h> |
| #include <arrow/util/async_generator.h> |
| #include <arrow/util/future.h> |
| #include <arrow/util/thread_pool.h> |
| |
| #include <iostream> |
| #include <optional> |
| |
| namespace acero = ::arrow::acero; |
| namespace compute = ::arrow::compute; |
| |
| std::shared_ptr<compute::FunctionOptions> make_compute_options(std::string func_name, |
| cpp11::list options); |
| |
| std::shared_ptr<arrow::KeyValueMetadata> strings_to_kvm(cpp11::strings metadata); |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecPlan> ExecPlan_create(bool use_threads) { |
| static compute::ExecContext threaded_context{gc_memory_pool(), |
| arrow::internal::GetCpuThreadPool()}; |
| // TODO(weston) using gc_context() in this way is deprecated. Once ordering has |
| // been added we can probably entirely remove all reference to ExecPlan from R |
| // in favor of DeclarationToXyz |
| auto plan = |
| ValueOrStop(acero::ExecPlan::Make(use_threads ? &threaded_context : gc_context())); |
| |
| return plan; |
| } |
| |
| std::shared_ptr<acero::ExecNode> MakeExecNodeOrStop( |
| const std::string& factory_name, acero::ExecPlan* plan, |
| std::vector<acero::ExecNode*> inputs, const acero::ExecNodeOptions& options) { |
| return std::shared_ptr<acero::ExecNode>( |
| ValueOrStop(acero::MakeExecNode(factory_name, plan, std::move(inputs), options)), |
| [](...) { |
| // empty destructor: ExecNode lifetime is managed by an ExecPlan |
| }); |
| } |
| |
| // This class is a special RecordBatchReader that holds a reference to the |
| // underlying exec plan so that (1) it can request that the ExecPlan *stop* |
| // producing when this object is deleted and (2) it can defer requesting |
| // the ExecPlan to *start* producing until the first batch has been pulled. |
| // This allows it to be transformed (e.g., using map_batches() or head()) |
| // and queried (i.e., used as input to another ExecPlan), at the R level |
| // while maintaining the ability for the entire plan to be executed at once |
| // (e.g., to support user-defined functions) or never executed at all (e.g., |
| // to support printing a nested ExecPlan without having to execute it). |
| class ExecPlanReader : public arrow::RecordBatchReader { |
| public: |
| enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED }; |
| |
| ExecPlanReader(const std::shared_ptr<arrow::acero::ExecPlan>& plan, |
| const std::shared_ptr<arrow::Schema>& schema, |
| arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen) |
| : schema_(schema), |
| plan_(plan), |
| sink_gen_(sink_gen), |
| plan_status_(PLAN_NOT_STARTED), |
| stop_token_(MainRThread::GetInstance().GetStopToken()) {} |
| |
| std::string PlanStatus() const { |
| switch (plan_status_) { |
| case PLAN_NOT_STARTED: |
| return "PLAN_NOT_STARTED"; |
| case PLAN_RUNNING: |
| return "PLAN_RUNNING"; |
| case PLAN_FINISHED: |
| return "PLAN_FINISHED"; |
| default: |
| return "UNKNOWN"; |
| } |
| } |
| |
| std::shared_ptr<arrow::Schema> schema() const override { return schema_; } |
| |
| arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) override { |
| // If this is the first batch getting pulled, tell the exec plan to |
| // start producing |
| if (plan_status_ == PLAN_NOT_STARTED) { |
| StartProducing(); |
| } |
| |
| // If we've closed the reader, keep sending nullptr |
| // (consistent with what most RecordBatchReader subclasses do) |
| if (plan_status_ == PLAN_FINISHED) { |
| batch_out->reset(); |
| return arrow::Status::OK(); |
| } |
| |
| // Check for cancellation and stop the plan if we have a request. When |
| // the ExecPlan supports passing a StopToken and handling this itself, |
| // this will be redundant. |
| if (stop_token_.IsStopRequested()) { |
| StopProducing(); |
| return stop_token_.Poll(); |
| } |
| |
| auto out = sink_gen_().result(); |
| if (!out.ok()) { |
| StopProducing(); |
| return out.status(); |
| } |
| |
| if (out.ValueUnsafe()) { |
| auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_, gc_memory_pool()); |
| if (!batch_result.ok()) { |
| StopProducing(); |
| return batch_result.status(); |
| } |
| |
| *batch_out = batch_result.ValueUnsafe(); |
| } else { |
| batch_out->reset(); |
| plan_status_ = PLAN_FINISHED; |
| return plan_->finished().status(); |
| } |
| |
| return arrow::Status::OK(); |
| } |
| |
| arrow::Status Close() override { |
| StopProducing(); |
| return arrow::Status::OK(); |
| } |
| |
| const std::shared_ptr<arrow::acero::ExecPlan>& Plan() const { return plan_; } |
| |
| ~ExecPlanReader() { StopProducing(); } |
| |
| private: |
| std::shared_ptr<arrow::Schema> schema_; |
| std::shared_ptr<arrow::acero::ExecPlan> plan_; |
| arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen_; |
| ExecPlanReaderStatus plan_status_; |
| arrow::StopToken stop_token_; |
| |
| void StartProducing() { |
| plan_->StartProducing(); |
| plan_status_ = PLAN_RUNNING; |
| } |
| |
| void StopProducing() { |
| if (plan_status_ == PLAN_RUNNING) { |
| // We're done with the plan, but it may still need some time |
| // to finish and clean up after itself. To do this, we give a |
| // callable with its own copy of the shared_ptr<ExecPlan> so |
| // that it can delete itself when it is safe to do so. |
| std::shared_ptr<arrow::acero::ExecPlan> plan(plan_); |
| bool not_finished_yet = plan_->finished().TryAddCallback( |
| [&plan] { return [plan](const arrow::Status&) {}; }); |
| |
| if (not_finished_yet) { |
| plan_->StopProducing(); |
| } |
| } |
| |
| plan_status_ = PLAN_FINISHED; |
| // A previous version of this called plan_.reset() and reset |
| // sink_gen_ to an empty generator; however, this caused |
| // crashes on some platforms. |
| } |
| }; |
| |
| // [[acero::export]] |
| cpp11::list ExecPlanReader__batches( |
| const std::shared_ptr<arrow::RecordBatchReader>& reader) { |
| auto result = RunWithCapturedRIfPossible<arrow::RecordBatchVector>( |
| [&]() { return reader->ToRecordBatches(); }); |
| return arrow::r::to_r_list(ValueOrStop(result)); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<arrow::Table> Table__from_ExecPlanReader( |
| const std::shared_ptr<arrow::RecordBatchReader>& reader) { |
| auto result = RunWithCapturedRIfPossible<std::shared_ptr<arrow::Table>>( |
| [&]() { return reader->ToTable(); }); |
| |
| return ValueOrStop(result); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecPlan> ExecPlanReader__Plan( |
| const std::shared_ptr<ExecPlanReader>& reader) { |
| if (reader->PlanStatus() == "PLAN_FINISHED") { |
| cpp11::stop("Can't extract ExecPlan from a finished ExecPlanReader"); |
| } |
| |
| return reader->Plan(); |
| } |
| |
| // [[acero::export]] |
| std::string ExecPlanReader__PlanStatus(const std::shared_ptr<ExecPlanReader>& reader) { |
| return reader->PlanStatus(); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<ExecPlanReader> ExecPlan_run( |
| const std::shared_ptr<acero::ExecPlan>& plan, |
| const std::shared_ptr<acero::ExecNode>& final_node, cpp11::strings metadata) { |
| // For now, don't require R to construct SinkNodes. |
| // Instead, just pass the node we should collect as an argument. |
| arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen; |
| |
| MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, |
| acero::SinkNodeOptions{&sink_gen}); |
| |
| StopIfNotOk(plan->Validate()); |
| |
| // Attach metadata to the schema |
| auto out_schema = final_node->output_schema(); |
| if (metadata.size() > 0) { |
| auto kv = strings_to_kvm(metadata); |
| out_schema = out_schema->WithMetadata(kv); |
| } |
| |
| return std::make_shared<ExecPlanReader>(plan, out_schema, sink_gen); |
| } |
| |
| // [[acero::export]] |
| std::string ExecPlan_ToString(const std::shared_ptr<acero::ExecPlan>& plan) { |
| return plan->ToString(); |
| } |
| |
| // [[acero::export]] |
| void ExecPlan_UnsafeDelete(const std::shared_ptr<acero::ExecPlan>& plan) { |
| auto& plan_unsafe = const_cast<std::shared_ptr<acero::ExecPlan>&>(plan); |
| plan_unsafe.reset(); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<arrow::Schema> ExecNode_output_schema( |
| const std::shared_ptr<acero::ExecNode>& node) { |
| return node->output_schema(); |
| } |
| |
| // [[acero::export]] |
| bool ExecNode_has_ordered_batches(const std::shared_ptr<acero::ExecNode>& node) { |
| return !node->ordering().is_unordered(); |
| } |
| |
| #if defined(ARROW_R_WITH_DATASET) |
| |
| #include <arrow/dataset/file_base.h> |
| #include <arrow/dataset/plan.h> |
| #include <arrow/dataset/scanner.h> |
| |
| // [[dataset::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_Scan( |
| const std::shared_ptr<acero::ExecPlan>& plan, |
| const std::shared_ptr<ds::Dataset>& dataset, |
| const std::shared_ptr<compute::Expression>& filter, cpp11::list projection) { |
| arrow::dataset::internal::Initialize(); |
| |
| // TODO: pass in FragmentScanOptions |
| auto options = std::make_shared<ds::ScanOptions>(); |
| |
| options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true); |
| options->dataset_schema = dataset->schema(); |
| |
| // This filter is only used for predicate pushdown; |
| // you still need to pass it to a FilterNode after to handle any other components |
| options->filter = *filter; |
| |
| // ScanNode needs to know which fields to materialize. |
| // It will pull them from this projection to prune the scan, |
| // but you still need to Project after |
| std::vector<compute::Expression> exprs; |
| for (SEXP expr : projection) { |
| auto expr_ptr = cpp11::as_cpp<std::shared_ptr<compute::Expression>>(expr); |
| exprs.push_back(*expr_ptr); |
| } |
| cpp11::strings field_names(projection.attr(R_NamesSymbol)); |
| options->projection = call( |
| "make_struct", std::move(exprs), |
| compute::MakeStructOptions{cpp11::as_cpp<std::vector<std::string>>(field_names)}); |
| |
| return MakeExecNodeOrStop("scan", plan.get(), {}, |
| ds::ScanNodeOptions{dataset, options}); |
| } |
| |
| // [[dataset::export]] |
| void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, |
| const std::shared_ptr<acero::ExecNode>& final_node, |
| const std::shared_ptr<arrow::Schema>& schema, |
| const std::shared_ptr<ds::FileWriteOptions>& file_write_options, |
| const std::shared_ptr<fs::FileSystem>& filesystem, |
| std::string base_dir, |
| const std::shared_ptr<ds::Partitioning>& partitioning, |
| std::string basename_template, |
| arrow::dataset::ExistingDataBehavior existing_data_behavior, |
| int max_partitions, uint32_t max_open_files, |
| uint64_t max_rows_per_file, uint64_t min_rows_per_group, |
| uint64_t max_rows_per_group, bool create_directory) { |
| arrow::dataset::internal::Initialize(); |
| |
| // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R |
| // and encapsulate this logic better |
| ds::FileSystemDatasetWriteOptions opts; |
| opts.file_write_options = file_write_options; |
| opts.existing_data_behavior = existing_data_behavior; |
| opts.filesystem = filesystem; |
| opts.base_dir = base_dir; |
| opts.partitioning = partitioning; |
| opts.basename_template = basename_template; |
| opts.max_partitions = max_partitions; |
| opts.max_open_files = max_open_files; |
| opts.max_rows_per_file = max_rows_per_file; |
| opts.min_rows_per_group = min_rows_per_group; |
| opts.max_rows_per_group = max_rows_per_group; |
| opts.create_dir = create_directory; |
| |
| ds::WriteNodeOptions options(std::move(opts)); |
| options.custom_schema = std::move(schema); |
| |
| MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, std::move(options)); |
| |
| StopIfNotOk(plan->Validate()); |
| |
| arrow::Status result = RunWithCapturedRIfPossibleVoid([&]() { |
| plan->StartProducing(); |
| return plan->finished().status(); |
| }); |
| |
| StopIfNotOk(result); |
| } |
| |
| #endif |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_Filter( |
| const std::shared_ptr<acero::ExecNode>& input, |
| const std::shared_ptr<compute::Expression>& filter) { |
| return MakeExecNodeOrStop("filter", input->plan(), {input.get()}, |
| acero::FilterNodeOptions{*filter}); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_Project( |
| const std::shared_ptr<acero::ExecNode>& input, |
| const std::vector<std::shared_ptr<compute::Expression>>& exprs, |
| std::vector<std::string> names) { |
| // We have shared_ptrs of expressions but need the Expressions |
| std::vector<compute::Expression> expressions; |
| for (auto expr : exprs) { |
| expressions.push_back(*expr); |
| } |
| return MakeExecNodeOrStop( |
| "project", input->plan(), {input.get()}, |
| acero::ProjectNodeOptions{std::move(expressions), std::move(names)}); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_Aggregate( |
| const std::shared_ptr<acero::ExecNode>& input, cpp11::list options, |
| std::vector<std::string> key_names) { |
| std::vector<arrow::compute::Aggregate> aggregates; |
| |
| for (cpp11::list name_opts : options) { |
| auto function = cpp11::as_cpp<std::string>(name_opts["fun"]); |
| auto opts = make_compute_options(function, name_opts["options"]); |
| auto target_names = cpp11::as_cpp<std::vector<std::string>>(name_opts["targets"]); |
| auto name = cpp11::as_cpp<std::string>(name_opts["name"]); |
| |
| std::vector<arrow::FieldRef> targets; |
| for (auto&& target : target_names) { |
| targets.emplace_back(std::move(target)); |
| } |
| aggregates.push_back(arrow::compute::Aggregate{std::move(function), opts, |
| std::move(targets), std::move(name)}); |
| } |
| |
| std::vector<arrow::FieldRef> keys; |
| for (auto&& name : key_names) { |
| keys.emplace_back(std::move(name)); |
| } |
| return MakeExecNodeOrStop( |
| "aggregate", input->plan(), {input.get()}, |
| acero::AggregateNodeOptions{std::move(aggregates), std::move(keys)}); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_Join( |
| const std::shared_ptr<acero::ExecNode>& input, acero::JoinType join_type, |
| const std::shared_ptr<acero::ExecNode>& right_data, |
| std::vector<std::string> left_keys, std::vector<std::string> right_keys, |
| std::vector<std::string> left_output, std::vector<std::string> right_output, |
| std::string output_suffix_for_left, std::string output_suffix_for_right, |
| bool na_matches) { |
| std::vector<arrow::FieldRef> left_refs, right_refs, left_out_refs, right_out_refs; |
| std::vector<acero::JoinKeyCmp> key_cmps; |
| for (auto&& name : left_keys) { |
| left_refs.emplace_back(std::move(name)); |
| // Populate key_cmps in this loop, one for each key |
| // Note that Acero supports having different values for each key, but dplyr |
| // only supports one value for all keys, so we're only going to support that |
| // for now. |
| key_cmps.emplace_back(na_matches ? acero::JoinKeyCmp::IS : acero::JoinKeyCmp::EQ); |
| } |
| for (auto&& name : right_keys) { |
| right_refs.emplace_back(std::move(name)); |
| } |
| for (auto&& name : left_output) { |
| left_out_refs.emplace_back(std::move(name)); |
| } |
| // dplyr::semi_join => LEFT_SEMI; dplyr::anti_join => LEFT_ANTI |
| // So ignoring RIGHT_SEMI and RIGHT_ANTI here because dplyr doesn't implement them. |
| if (join_type != acero::JoinType::LEFT_SEMI && |
| join_type != acero::JoinType::LEFT_ANTI) { |
| // Don't include out_refs in semi/anti join |
| for (auto&& name : right_output) { |
| right_out_refs.emplace_back(std::move(name)); |
| } |
| } |
| |
| return MakeExecNodeOrStop( |
| "hashjoin", input->plan(), {input.get(), right_data.get()}, |
| acero::HashJoinNodeOptions{join_type, std::move(left_refs), std::move(right_refs), |
| std::move(left_out_refs), std::move(right_out_refs), |
| std::move(key_cmps), compute::literal(true), |
| std::move(output_suffix_for_left), |
| std::move(output_suffix_for_right)}); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_Union( |
| const std::shared_ptr<acero::ExecNode>& input, |
| const std::shared_ptr<acero::ExecNode>& right_data) { |
| return MakeExecNodeOrStop("union", input->plan(), {input.get(), right_data.get()}, {}); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_Fetch( |
| const std::shared_ptr<acero::ExecNode>& input, int64_t offset, int64_t limit) { |
| return MakeExecNodeOrStop("fetch", input->plan(), {input.get()}, |
| acero::FetchNodeOptions{offset, limit}); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_OrderBy( |
| const std::shared_ptr<acero::ExecNode>& input, cpp11::list sort_options) { |
| return MakeExecNodeOrStop( |
| "order_by", input->plan(), {input.get()}, |
| acero::OrderByNodeOptions{std::dynamic_pointer_cast<compute::SortOptions>( |
| make_compute_options("sort_indices", sort_options)) |
| ->AsOrdering()}); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_SourceNode( |
| const std::shared_ptr<acero::ExecPlan>& plan, |
| const std::shared_ptr<arrow::RecordBatchReader>& reader) { |
| arrow::acero::RecordBatchReaderSourceNodeOptions options{reader}; |
| return MakeExecNodeOrStop("record_batch_reader_source", plan.get(), {}, options); |
| } |
| |
| // [[acero::export]] |
| std::shared_ptr<acero::ExecNode> ExecNode_TableSourceNode( |
| const std::shared_ptr<acero::ExecPlan>& plan, |
| const std::shared_ptr<arrow::Table>& table) { |
| arrow::acero::TableSourceNodeOptions options{/*table=*/table, |
| // TODO: make batch_size configurable |
| /*batch_size=*/1048576}; |
| |
| return MakeExecNodeOrStop("table_source", plan.get(), {}, options); |
| } |
| |
| #endif |
| |
| #if defined(ARROW_R_WITH_SUBSTRAIT) |
| |
| #include <arrow/engine/substrait/api.h> |
| |
| // Just for example usage until a C++ method is available that implements |
| // a RecordBatchReader output (ARROW-15849) |
| class AccumulatingConsumer : public acero::SinkNodeConsumer { |
| public: |
| const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches() { return batches_; } |
| |
| arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema, |
| acero::BackpressureControl* backpressure_control, |
| acero::ExecPlan* exec_plan) override { |
| schema_ = schema; |
| return arrow::Status::OK(); |
| } |
| |
| arrow::Status Consume(compute::ExecBatch batch) override { |
| auto record_batch = batch.ToRecordBatch(schema_); |
| ARROW_RETURN_NOT_OK(record_batch); |
| batches_.push_back(record_batch.ValueUnsafe()); |
| |
| return arrow::Status::OK(); |
| } |
| |
| arrow::Future<> Finish() override { return arrow::Future<>::MakeFinished(); } |
| |
| private: |
| std::shared_ptr<arrow::Schema> schema_; |
| std::vector<std::shared_ptr<arrow::RecordBatch>> batches_; |
| }; |
| |
| // Expose these so that it's easier to write tests |
| |
| // [[substrait::export]] |
| std::string substrait__internal__SubstraitToJSON( |
| const std::shared_ptr<arrow::Buffer>& serialized_plan) { |
| return ValueOrStop(arrow::engine::internal::SubstraitToJSON("Plan", *serialized_plan)); |
| } |
| |
| // [[substrait::export]] |
| std::shared_ptr<arrow::Buffer> substrait__internal__SubstraitFromJSON( |
| std::string substrait_json) { |
| return ValueOrStop(arrow::engine::internal::SubstraitFromJSON("Plan", substrait_json)); |
| } |
| |
| // [[substrait::export]] |
| std::shared_ptr<arrow::Table> ExecPlan_run_substrait( |
| const std::shared_ptr<acero::ExecPlan>& plan, |
| const std::shared_ptr<arrow::Buffer>& serialized_plan) { |
| std::vector<std::shared_ptr<AccumulatingConsumer>> consumers; |
| |
| std::function<std::shared_ptr<acero::SinkNodeConsumer>()> consumer_factory = [&] { |
| consumers.emplace_back(new AccumulatingConsumer()); |
| return consumers.back(); |
| }; |
| |
| arrow::Result<std::vector<acero::Declaration>> maybe_decls = |
| ValueOrStop(arrow::engine::DeserializePlans(*serialized_plan, consumer_factory)); |
| std::vector<acero::Declaration> decls = std::move(ValueOrStop(maybe_decls)); |
| |
| // For now, the Substrait plan must include a 'read' that points to |
| // a Parquet file (instead of using a source node create in Arrow) |
| for (const acero::Declaration& decl : decls) { |
| auto node = decl.AddToPlan(plan.get()); |
| StopIfNotOk(node.status()); |
| } |
| |
| StopIfNotOk(plan->Validate()); |
| plan->StartProducing(); |
| StopIfNotOk(plan->finished().status()); |
| |
| std::vector<std::shared_ptr<arrow::RecordBatch>> all_batches; |
| for (const auto& consumer : consumers) { |
| for (const auto& batch : consumer->batches()) { |
| all_batches.push_back(batch); |
| } |
| } |
| |
| return ValueOrStop(arrow::Table::FromRecordBatches(std::move(all_batches))); |
| } |
| |
| #endif |