| .. Licensed to the Apache Software Foundation (ASF) under one |
| .. or more contributor license agreements. See the NOTICE file |
| .. distributed with this work for additional information |
| .. regarding copyright ownership. The ASF licenses this file |
| .. to you under the Apache License, Version 2.0 (the |
| .. "License"); you may not use this file except in compliance |
| .. with the License. You may obtain a copy of the License at |
| |
| .. http://www.apache.org/licenses/LICENSE-2.0 |
| |
| .. Unless required by applicable law or agreed to in writing, |
| .. software distributed under the License is distributed on an |
| .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| .. KIND, either express or implied. See the License for the |
| .. specific language governing permissions and limitations |
| .. under the License. |
| |
| .. default-domain:: cpp |
| .. highlight:: cpp |
| .. cpp:namespace:: arrow::compute |
| |
| ======================================= |
| Acero: A C++ streaming execution engine |
| ======================================= |
| |
| .. warning:: |
| |
| Acero is experimental and a stable API is not yet guaranteed. |
| |
| Motivation |
| ========== |
| |
| For many complex computations, successive direct :ref:`invocation of |
| compute functions <invoking-compute-functions>` is not feasible |
| in either memory or computation time. Doing so causes all intermediate |
| data to be fully materialized. To facilitate arbitrarily large inputs |
| and more efficient resource usage, the Arrow C++ implementation also |
| provides Acero, a streaming query engine with which computations can |
| be formulated and executed. |
| |
| .. image:: simple_graph.svg |
| :alt: An example graph of a streaming execution workflow. |
| |
| Acero allows computation to be expressed as an "execution plan" |
| (:class:`ExecPlan`) which is a directed graph of operators. Each operator |
| (:class:`ExecNode`) provides, transforms, or consumes the data passing |
| through it. Batches of data (:struct:`ExecBatch`) flow along edges of |
| the graph from node to node. Structuring the API around streams of batches |
| allows the working set for each node to be tuned for optimal performance |
| independent of any other nodes in the graph. Each :class:`ExecNode` |
| processes batches as they are pushed to it along an edge of the graph by |
| upstream nodes (its inputs), and pushes batches along an edge of the graph |
| to downstream nodes (its outputs) as they are finalized. |
| |
| .. seealso:: |
| |
| `SHAIKHHA, A., DASHTI, M., & KOCH, C. |
| (2018). Push versus pull-based loop fusion in query engines. |
| Journal of Functional Programming, 28. |
| <https://doi.org/10.1017/s0956796818000102>`_ |
| |
| Substrait |
| ========= |
| |
| In order to use Acero you will need to create an execution plan. This is the |
| model that describes the computation you want to apply to your data. Acero has |
| its own internal representation for execution plans but most users should not |
| interact with this directly as it will couple their code to Acero. |
| |
| `Substrait <https://substrait.io>`_ is an open standard for execution plans. |
| Acero implements the Substrait "consumer" interface. This means that Acero can |
| accept a Substrait plan and fulfill the plan, loading the requested data and |
| applying the desired computation. By using Substrait plans users can easily |
| switch out to a different execution engine at a later time. |
| |
| Substrait Conformance |
| --------------------- |
| |
| Substrait defines a broad set of operators and functions for many different |
| situations and it is unlikely that Acero will ever completely satisfy all |
| defined Substrait operators and functions. To help understand what features |
| are available the following sections define which features have been currently |
| implemented in Acero and any caveats that apply. |
| |
| Plans |
| ^^^^^ |
| |
| * A plan should have a single top-level relation. |
| * The consumer is currently based on a custom build of Substrait that |
| is older than 0.1.0. Any features added that are newer than 0.1.0 will |
| not be supported. |
| |
| Extensions |
| ^^^^^^^^^^ |
| |
| * If a plan contains any extension type variations it will be rejected. |
| * If a plan contains any advanced extensions it will be rejected. |
| |
| Relations (in general) |
| ^^^^^^^^^^^^^^^^^^^^^^ |
| |
| * The ``emit`` property (to customize output order of a node or to drop |
| columns) is not supported and plans containing this property will |
| be rejected. |
| * The ``hint`` property is not supported and plans containing this |
| property will be rejected. |
| * Any advanced extensions will cause a plan to be rejected. |
| * Any relation not explicitly listed below will not be supported |
| and will cause the plan to be rejected. |
| |
| Read Relations |
| ^^^^^^^^^^^^^^ |
| |
| * The ``projection`` property is not supported and plans containing this |
| property will be rejected. |
| * The only supported read type is ``LocalFiles``. Plans with any other |
| type will be rejected. |
| * Only the parquet file format is currently supported. |
| * All URIs must use the ``file`` scheme |
| * ``partition_index``, ``start``, and ``length`` are not supported. Plans containing |
| these properties will be rejected. |
| * The Substrait spec requires that a ``filter`` be completely satisfied by a read |
| relation. However, Acero only uses a read filter for pushdown projection and |
| it may not be fully satisfied. Users should generally attach an additional |
| filter relation with the same filter expression after the read relation. |
| |
| Filter Relations |
| ^^^^^^^^^^^^^^^^ |
| |
| * No known caveats |
| |
| Project Relations |
| ^^^^^^^^^^^^^^^^^ |
| |
| * No known caveats |
| |
| Join Relations |
| ^^^^^^^^^^^^^^ |
| |
| * The join type ``JOIN_TYPE_SINGLE`` is not supported and plans containing this |
| will be rejected. |
| * The join expression must be a call to either the ``equal`` or ``is_not_distinct_from`` |
| functions. Both arguments to the call must be direct references. Only a single |
| join key is supported. |
| * The ``post_join_filter`` property is not supported and will be ignored. |
| |
| Aggregate Relations |
| ^^^^^^^^^^^^^^^^^^^ |
| |
| * At most one grouping set is supported. |
| * Each grouping expression must be a direct reference. |
| * Each measure's arguments must be direct references. |
| * A measure may not have a filter |
| * A measure may not have sorts |
| * A measure's invocation must be AGGREGATION_INVOCATION_ALL |
| * A measure's phase must be AGGREGATION_PHASE_INITIAL_TO_RESULT |
| |
| Expressions (general) |
| ^^^^^^^^^^^^^^^^^^^^^ |
| |
| * Various places in the Substrait spec allow for expressions to be used outside |
| of a filter or project relation. For example, a join expression or an aggregate |
| grouping set. Acero typically expects these expressions to be direct references. |
| Planners should extract the implicit projection into a formal project relation |
| before delivering the plan to Acero. |
| * Older versions of Isthmus would omit optional arguments instead of including them |
| as unspecified enums. Acero will not support these plans. |
| |
| Literals |
| ^^^^^^^^ |
| |
| * A literal with non-default nullability will cause a plan to be rejected. |
| |
| Types |
| ^^^^^ |
| |
| * Acero does not have full support for non-nullable types and may allow input |
| to have nulls without rejecting it. |
| * The table below shows the mapping between Arrow types and Substrait type |
| classes that are currently supported |
| |
| .. list-table:: Substrait / Arrow Type Mapping |
| :widths: 25 25 50 |
| :header-rows: 1 |
| |
| * - Substrait Type |
| - Arrow Type |
| - Caveat |
| * - boolean |
| - boolean |
| - |
| * - i8 |
| - int8 |
| - |
| * - i16 |
| - int16 |
| - |
| * - i32 |
| - int32 |
| - |
| * - i64 |
| - int64 |
| - |
| * - fp32 |
| - float32 |
| - |
| * - fp64 |
| - float64 |
| - |
| * - string |
| - string |
| - |
| * - binary |
| - binary |
| - |
| * - timestamp |
| - timestamp<MICRO,""> |
| - |
| * - timestamp_tz |
| - timestamp<MICRO,"UTC"> |
| - |
| * - date |
| - date32<DAY> |
| - |
| * - time |
| - time64<MICRO> |
| - |
| * - interval_year |
| - |
| - Not currently supported |
| * - interval_day |
| - |
| - Not currently supported |
| * - uuid |
| - |
| - Not currently supported |
| * - FIXEDCHAR<L> |
| - |
| - Not currently supported |
| * - VARCHAR<L> |
| - |
| - Not currently supported |
| * - FIXEDBINARY<L> |
| - fixed_size_binary<L> |
| - |
| * - DECIMAL<P,S> |
| - decimal128<P,S> |
| - |
| * - STRUCT<T1...TN> |
| - struct<T1...TN> |
| - Arrow struct fields will have no name (empty string) |
| * - NSTRUCT<N:T1...N:Tn> |
| - |
| - Not currently supported |
| * - LIST<T> |
| - list<T> |
| - |
| * - MAP<K,V> |
| - map<K,V> |
| - K must not be nullable |
| |
| Functions |
| ^^^^^^^^^ |
| |
| * Acero does not support the legacy ``args`` style of declaring arguments |
| * The following functions have caveats or are not supported at all. Note that |
| this is not a comprehensive list. Functions are being added to Substrait at |
| a rapid pace and new functions may be missing. |
| |
| * Acero does not support the SATURATE option for overflow |
| * Acero does not support kernels that take more than two arguments |
| for the functions ``and``, ``or``, ``xor`` |
| * Acero does not support temporal arithmetic |
| * Acero does not support the following standard functions: |
| |
| * ``is_not_distinct_from`` |
| * ``like`` |
| * ``substring`` |
| * ``starts_with`` |
| * ``ends_with`` |
| * ``contains`` |
| * ``count`` |
| * ``count_distinct`` |
| * ``approx_count_distinct`` |
| |
| * The functions above should be referenced using the URI |
| ``https://github.com/apache/arrow/blob/main/format/substrait/extension_types.yaml`` |
| * Alternatively, the URI can be left completely empty and Acero will match |
| based only on function name. This fallback mechanism is non-standard and should |
| be avoided if possible. |
| |
| Architecture Overview |
| ===================== |
| |
| :class:`ExecNode` |
| Each node in the graph is an implementation of the :class:`ExecNode` interface. |
| |
| :class:`ExecPlan` |
| A set of :class:`ExecNode` is contained and (to an extent) coordinated by an |
| :class:`ExecPlan`. |
| |
| :class:`ExecFactoryRegistry` |
| Instances of :class:`ExecNode` are constructed by factory functions held |
| in a :class:`ExecFactoryRegistry`. |
| |
| :class:`ExecNodeOptions` |
| Heterogenous parameters for factories of :class:`ExecNode` are bundled in an |
| :class:`ExecNodeOptions`. |
| |
| :struct:`Declaration` |
| ``dplyr``-inspired helper for efficient construction of an :class:`ExecPlan`. |
| |
| :struct:`ExecBatch` |
| A lightweight container for a single chunk of data in the Arrow format. In |
| contrast to :class:`RecordBatch`, :struct:`ExecBatch` is intended for use |
| exclusively in a streaming execution context (for example, it doesn't have a |
| corresponding Python binding). Furthermore columns which happen to have a |
| constant value may be represented by a :class:`Scalar` instead of an |
| :class:`Array`. In addition, :struct:`ExecBatch` may carry |
| execution-relevant properties including a guaranteed-true-filter |
| for :class:`Expression` simplification. |
| |
| |
| An example :class:`ExecNode` implementation which simply passes all input batches |
| through unchanged:: |
| |
| class PassthruNode : public ExecNode { |
| public: |
| // InputReceived is the main entry point for ExecNodes. It is invoked |
| // by an input of this node to push a batch here for processing. |
| void InputReceived(ExecNode* input, ExecBatch batch) override { |
| // Since this is a passthru node we simply push the batch to our |
| // only output here. |
| outputs_[0]->InputReceived(this, batch); |
| } |
| |
| // ErrorReceived is called by an input of this node to report an error. |
| // ExecNodes should always forward errors to their outputs unless they |
| // are able to fully handle the error (this is rare). |
| void ErrorReceived(ExecNode* input, Status error) override { |
| outputs_[0]->ErrorReceived(this, error); |
| } |
| |
| // InputFinished is used to signal how many batches will ultimately arrive. |
| // It may be called with any ordering relative to InputReceived/ErrorReceived. |
| void InputFinished(ExecNode* input, int total_batches) override { |
| outputs_[0]->InputFinished(this, total_batches); |
| } |
| |
| // ExecNodes may request that their inputs throttle production of batches |
| // until they are ready for more, or stop production if no further batches |
| // are required. These signals should typically be forwarded to the inputs |
| // of the ExecNode. |
| void ResumeProducing(ExecNode* output) override { inputs_[0]->ResumeProducing(this); } |
| void PauseProducing(ExecNode* output) override { inputs_[0]->PauseProducing(this); } |
| void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } |
| |
| // An ExecNode has a single output schema to which all its batches conform. |
| using ExecNode::output_schema; |
| |
| // ExecNodes carry basic introspection for debugging purposes |
| const char* kind_name() const override { return "PassthruNode"; } |
| using ExecNode::label; |
| using ExecNode::SetLabel; |
| using ExecNode::ToString; |
| |
| // An ExecNode holds references to its inputs and outputs, so it is possible |
| // to walk the graph of execution if necessary. |
| using ExecNode::inputs; |
| using ExecNode::outputs; |
| |
| // StartProducing() and StopProducing() are invoked by an ExecPlan to |
| // coordinate the graph-wide execution state. These do not need to be |
| // forwarded to inputs or outputs. |
| Status StartProducing() override { return Status::OK(); } |
| void StopProducing() override {} |
| Future<> finished() override { return inputs_[0]->finished(); } |
| }; |
| |
| Note that each method which is associated with an edge of the graph must be invoked |
| with an ``ExecNode*`` to identify the node which invoked it. For example, in an |
| :class:`ExecNode` which implements ``JOIN`` this tagging might be used to differentiate |
| between batches from the left or right inputs. |
| ``InputReceived``, ``ErrorReceived``, ``InputFinished`` may only be invoked by |
| the inputs of a node, while ``ResumeProducing``, ``PauseProducing``, ``StopProducing`` |
| may only be invoked by outputs of a node. |
| |
| :class:`ExecPlan` contains the associated instances of :class:`ExecNode` |
| and is used to start and stop execution of all nodes and for querying/awaiting |
| their completion:: |
| |
| // construct an ExecPlan first to hold your nodes |
| ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context())); |
| |
| // ... add nodes to your ExecPlan |
| |
| // start all nodes in the graph |
| ARROW_RETURN_NOT_OK(plan->StartProducing()); |
| |
| SetUserCancellationCallback([plan] { |
| // stop all nodes in the graph |
| plan->StopProducing(); |
| }); |
| |
| // Complete will be marked finished when all nodes have run to completion |
| // or acknowledged a StopProducing() signal. The ExecPlan should be kept |
| // alive until this future is marked finished. |
| Future<> complete = plan->finished(); |
| |
| |
| Constructing ``ExecPlan`` objects |
| ================================= |
| |
| .. warning:: |
| |
| The following will be superceded by construction from Compute IR, see ARROW-14074. |
| |
| None of the concrete implementations of :class:`ExecNode` are exposed |
| in headers, so they can't be constructed directly outside the |
| translation unit where they are defined. Instead, factories to |
| create them are provided in an extensible registry. This structure |
| provides a number of benefits: |
| |
| - This enforces consistent construction. |
| - It decouples implementations from consumers of the interface |
| (for example: we have two classes for scalar and grouped aggregate, |
| we can choose which to construct within the single factory by |
| checking whether grouping keys are provided) |
| - This expedites integration with out-of-library extensions. For example |
| "scan" nodes are implemented in the separate ``libarrow_dataset.so`` library. |
| - Since the class is not referencable outside the translation unit in which it |
| is defined, compilers can optimize more aggressively. |
| |
| Factories of :class:`ExecNode` can be retrieved by name from the registry. |
| The default registry is available through |
| :func:`arrow::compute::default_exec_factory_registry()` |
| and can be queried for the built-in factories:: |
| |
| // get the factory for "filter" nodes: |
| ARROW_ASSIGN_OR_RAISE(auto make_filter, |
| default_exec_factory_registry()->GetFactory("filter")); |
| |
| // factories take three arguments: |
| ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter( |
| // the ExecPlan which should own this node |
| plan.get(), |
| |
| // nodes which will send batches to this node (inputs) |
| {scan_node}, |
| |
| // parameters unique to "filter" nodes |
| FilterNodeOptions{filter_expression})); |
| |
| // alternative shorthand: |
| ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter", |
| plan.get(), {scan_node}, FilterNodeOptions{filter_expression}); |
| |
| Factories can also be added to the default registry as long as they are |
| convertible to ``std::function<Result<ExecNode*>( |
| ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>``. |
| |
| To build an :class:`ExecPlan` representing a simple pipeline which |
| reads from a :class:`RecordBatchReader` then filters, projects, and |
| writes to disk:: |
| |
| std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches(); |
| ExecNode* source_node = *MakeExecNode("source", plan.get(), {}, |
| SourceNodeOptions::FromReader( |
| reader, |
| GetCpuThreadPool())); |
| |
| ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node}, |
| FilterNodeOptions{ |
| greater(field_ref("score"), literal(3)) |
| }); |
| |
| ExecNode* project_node = *MakeExecNode("project", plan.get(), {filter_node}, |
| ProjectNodeOptions{ |
| {add(field_ref("score"), literal(1))}, |
| {"score + 1"} |
| }); |
| |
| arrow::dataset::internal::Initialize(); |
| MakeExecNode("write", plan.get(), {project_node}, |
| WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}); |
| |
| :struct:`Declaration` is a `dplyr <https://dplyr.tidyverse.org>`_-inspired |
| helper which further decreases the boilerplate associated with populating |
| an :class:`ExecPlan` from C++:: |
| |
| arrow::dataset::internal::Initialize(); |
| |
| std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches(); |
| ASSERT_OK(Declaration::Sequence( |
| { |
| {"source", SourceNodeOptions::FromReader( |
| reader, |
| GetCpuThreadPool())}, |
| {"filter", FilterNodeOptions{ |
| greater(field_ref("score"), literal(3))}}, |
| {"project", ProjectNodeOptions{ |
| {add(field_ref("score"), literal(1))}, |
| {"score + 1"}}}, |
| {"write", WriteNodeOptions{/*base_dir=*/"/dat", /*...*/}}, |
| }) |
| .AddToPlan(plan.get())); |
| |
| Note that a source node can wrap anything which resembles a stream of batches. |
| For example, `PR#11032 <https://github.com/apache/arrow/pull/11032>`_ adds |
| support for use of a `DuckDB <https://duckdb.org>`_ query as a source node. |
| Similarly, a sink node can wrap anything which absorbs a stream of batches. |
| In the example above we're writing completed |
| batches to disk. However we can also collect these in memory into a :class:`Table` |
| or forward them to a :class:`RecordBatchReader` as an out-of-graph stream. |
| This flexibility allows an :class:`ExecPlan` to be used as streaming middleware |
| between any endpoints which support Arrow formatted batches. |
| |
| An :class:`arrow::dataset::Dataset` can also be wrapped as a source node which |
| pushes all the dataset's batches into an :class:`ExecPlan`. This factory is added |
| to the default registry with the name ``"scan"`` by calling |
| ``arrow::dataset::internal::Initialize()``:: |
| |
| arrow::dataset::internal::Initialize(); |
| |
| std::shared_ptr<Dataset> dataset = GetDataset(); |
| |
| ASSERT_OK(Declaration::Sequence( |
| { |
| {"scan", ScanNodeOptions{dataset, |
| /* push down predicate, projection, ... */}}, |
| {"filter", FilterNodeOptions{/* ... */}}, |
| // ... |
| }) |
| .AddToPlan(plan.get())); |
| |
| Datasets may be scanned multiple times; just make multiple scan |
| nodes from that dataset. (Useful for a self-join, for example.) |
| Note that producing two scan nodes like this will perform all |
| reads and decodes twice. |
| |
| Constructing ``ExecNode`` using Options |
| ======================================= |
| |
| :class:`ExecNode` is the component we use as a building block |
| containing in-built operations with various functionalities. |
| |
| This is the list of operations associated with the execution plan: |
| |
| .. list-table:: Operations and Options |
| :widths: 50 50 |
| :header-rows: 1 |
| |
| * - Operation |
| - Options |
| * - ``source`` |
| - :class:`arrow::compute::SourceNodeOptions` |
| * - ``table_source`` |
| - :class:`arrow::compute::TableSourceNodeOptions` |
| * - ``filter`` |
| - :class:`arrow::compute::FilterNodeOptions` |
| * - ``project`` |
| - :class:`arrow::compute::ProjectNodeOptions` |
| * - ``aggregate`` |
| - :class:`arrow::compute::AggregateNodeOptions` |
| * - ``sink`` |
| - :class:`arrow::compute::SinkNodeOptions` |
| * - ``consuming_sink`` |
| - :class:`arrow::compute::ConsumingSinkNodeOptions` |
| * - ``order_by_sink`` |
| - :class:`arrow::compute::OrderBySinkNodeOptions` |
| * - ``select_k_sink`` |
| - :class:`arrow::compute::SelectKSinkNodeOptions` |
| * - ``scan`` |
| - :class:`arrow::dataset::ScanNodeOptions` |
| * - ``hash_join`` |
| - :class:`arrow::compute::HashJoinNodeOptions` |
| * - ``write`` |
| - :class:`arrow::dataset::WriteNodeOptions` |
| * - ``union`` |
| - N/A |
| * - ``table_sink`` |
| - :class:`arrow::compute::TableSinkNodeOptions` |
| |
| .. _stream_execution_source_docs: |
| |
| ``source`` |
| ---------- |
| |
| A ``source`` operation can be considered as an entry point to create a streaming execution plan. |
| :class:`arrow::compute::SourceNodeOptions` are used to create the ``source`` operation. The |
| ``source`` operation is the most generic and flexible type of source currently available but it can |
| be quite tricky to configure. To process data from files the scan operation is likely a simpler choice. |
| |
| The source node requires some kind of function that can be called to poll for more data. This |
| function should take no arguments and should return an |
| ``arrow::Future<std::optional<arrow::ExecBatch>>``. |
| This function might be reading a file, iterating through an in memory structure, or receiving data |
| from a network connection. The arrow library refers to these functions as ``arrow::AsyncGenerator`` |
| and there are a number of utilities for working with these functions. For this example we use |
| a vector of record batches that we've already stored in memory. |
| In addition, the schema of the data must be known up front. Acero must know the schema of the data |
| at each stage of the execution graph before any processing has begun. This means we must supply the |
| schema for a source node separately from the data itself. |
| |
| Here we define a struct to hold the data generator definition. This includes in-memory batches, schema |
| and a function that serves as a data generator : |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: BatchesWithSchema Definition) |
| :end-before: (Doc section: BatchesWithSchema Definition) |
| :linenos: |
| :lineno-match: |
| |
| Generating sample batches for computation: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: MakeBasicBatches Definition) |
| :end-before: (Doc section: MakeBasicBatches Definition) |
| :linenos: |
| :lineno-match: |
| |
| Example of using ``source`` (usage of sink is explained in detail in :ref:`sink<stream_execution_sink_docs>`): |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Source Example) |
| :end-before: (Doc section: Source Example) |
| :linenos: |
| :lineno-match: |
| |
| ``table_source`` |
| ---------------- |
| |
| .. _stream_execution_table_source_docs: |
| |
| In the previous example, :ref:`source node <stream_execution_source_docs>`, a source node |
| was used to input the data. But when developing an application, if the data is already in memory |
| as a table, it is much easier, and more performant to use :class:`arrow::compute::TableSourceNodeOptions`. |
| Here the input data can be passed as a ``std::shared_ptr<arrow::Table>`` along with a ``max_batch_size``. |
| The ``max_batch_size`` is to break up large record batches so that they can be processed in parallel. |
| It is important to note that the table batches will not get merged to form larger batches when the source |
| table has a smaller batch size. |
| |
| Example of using ``table_source`` |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Table Source Example) |
| :end-before: (Doc section: Table Source Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_filter_docs: |
| |
| ``filter`` |
| ---------- |
| |
| ``filter`` operation, as the name suggests, provides an option to define data filtering |
| criteria. It selects rows matching a given expression. Filters can be written using |
| :class:`arrow::compute::Expression`. For example, if we wish to keep rows where the value |
| of column ``b`` is greater than 3, then we can use the following expression. |
| |
| Filter example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Filter Example) |
| :end-before: (Doc section: Filter Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_project_docs: |
| |
| ``project`` |
| ----------- |
| |
| ``project`` operation rearranges, deletes, transforms, and creates columns. |
| Each output column is computed by evaluating an expression |
| against the source record batch. This is exposed via |
| :class:`arrow::compute::ProjectNodeOptions` which requires, |
| an :class:`arrow::compute::Expression` and name for each of the output columns (if names are not |
| provided, the string representations of exprs will be used). |
| |
| Project example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Project Example) |
| :end-before: (Doc section: Project Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_aggregate_docs: |
| |
| ``aggregate`` |
| ------------- |
| |
| The ``aggregate`` node computes various types of aggregates over data. |
| |
| Arrow supports two types of aggregates: "scalar" aggregates, and |
| "hash" aggregates. Scalar aggregates reduce an array or scalar input |
| to a single scalar output (e.g. computing the mean of a column). Hash |
| aggregates act like ``GROUP BY`` in SQL and first partition data based |
| on one or more key columns, then reduce the data in each |
| partition. The ``aggregate`` node supports both types of computation, |
| and can compute any number of aggregations at once. |
| |
| :class:`arrow::compute::AggregateNodeOptions` is used to define the |
| aggregation criteria. It takes a list of aggregation functions and |
| their options; a list of target fields to aggregate, one per function; |
| and a list of names for the output fields, one per function. |
| Optionally, it takes a list of columns that are used to partition the |
| data, in the case of a hash aggregation. The aggregation functions |
| can be selected from :ref:`this list of aggregation functions |
| <aggregation-option-list>`. |
| |
| .. note:: This node is a "pipeline breaker" and will fully materialize |
| the dataset in memory. In the future, spillover mechanisms |
| will be added which should alleviate this constraint. |
| |
| The aggregation can provide results as a group or scalar. For instances, |
| an operation like `hash_count` provides the counts per each unique record |
| as a grouped result while an operation like `sum` provides a single record. |
| |
| Scalar Aggregation example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Scalar Aggregate Example) |
| :end-before: (Doc section: Scalar Aggregate Example) |
| :linenos: |
| :lineno-match: |
| |
| Group Aggregation example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Group Aggregate Example) |
| :end-before: (Doc section: Group Aggregate Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_sink_docs: |
| |
| ``sink`` |
| -------- |
| |
| ``sink`` operation provides output and is the final node of a streaming |
| execution definition. :class:`arrow::compute::SinkNodeOptions` interface is used to pass |
| the required options. Similar to the source operator the sink operator exposes the output |
| with a function that returns a record batch future each time it is called. It is expected the |
| caller will repeatedly call this function until the generator function is exhausted (returns |
| ``std::optional::nullopt``). If this function is not called often enough then record batches |
| will accumulate in memory. An execution plan should only have one |
| "terminal" node (one sink node). An :class:`ExecPlan` can terminate early due to cancellation or |
| an error, before the output is fully consumed. However, the plan can be safely destroyed independently |
| of the sink, which will hold the unconsumed batches by `exec_plan->finished()`. |
| |
| As a part of the Source Example, the Sink operation is also included; |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Source Example) |
| :end-before: (Doc section: Source Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_consuming_sink_docs: |
| |
| ``consuming_sink`` |
| ------------------ |
| |
| ``consuming_sink`` operator is a sink operation containing consuming operation within the |
| execution plan (i.e. the exec plan should not complete until the consumption has completed). |
| Unlike the ``sink`` node this node takes in a callback function that is expected to consume the |
| batch. Once this callback has finished the execution plan will no longer hold any reference to |
| the batch. |
| The consuming function may be called before a previous invocation has completed. If the consuming |
| function does not run quickly enough then many concurrent executions could pile up, blocking the |
| CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks |
| have been completed. |
| Once all batches have been delivered the execution plan will wait for the `finish` future to complete |
| before marking the execution plan finished. This allows for workflows where the consumption function |
| converts batches into async tasks (this is currently done internally for the dataset write node). |
| |
| Example:: |
| |
| // define a Custom SinkNodeConsumer |
| std::atomic<uint32_t> batches_seen{0}; |
| arrow::Future<> finish = arrow::Future<>::Make(); |
| struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer { |
| |
| CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish): |
| batches_seen(batches_seen), finish(std::move(finish)) {} |
| // Consumption logic can be written here |
| arrow::Status Consume(cp::ExecBatch batch) override { |
| // data can be consumed in the expected way |
| // transfer to another system or just do some work |
| // and write to disk |
| (*batches_seen)++; |
| return arrow::Status::OK(); |
| } |
| |
| arrow::Future<> Finish() override { return finish; } |
| |
| std::atomic<uint32_t> *batches_seen; |
| arrow::Future<> finish; |
| |
| }; |
| |
| std::shared_ptr<CustomSinkNodeConsumer> consumer = |
| std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish); |
| |
| arrow::compute::ExecNode *consuming_sink; |
| |
| ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(), |
| {source}, cp::ConsumingSinkNodeOptions(consumer))); |
| |
| |
| Consuming-Sink example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: ConsumingSink Example) |
| :end-before: (Doc section: ConsumingSink Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_order_by_sink_docs: |
| |
| ``order_by_sink`` |
| ----------------- |
| |
| ``order_by_sink`` operation is an extension to the ``sink`` operation. |
| This operation provides the ability to guarantee the ordering of the |
| stream by providing the :class:`arrow::compute::OrderBySinkNodeOptions`. |
| Here the :class:`arrow::compute::SortOptions` are provided to define which columns |
| are used for sorting and whether to sort by ascending or descending values. |
| |
| .. note:: This node is a "pipeline breaker" and will fully materialize the dataset in memory. |
| In the future, spillover mechanisms will be added which should alleviate this |
| constraint. |
| |
| |
| Order-By-Sink example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: OrderBySink Example) |
| :end-before: (Doc section: OrderBySink Example) |
| :linenos: |
| :lineno-match: |
| |
| |
| .. _stream_execution_select_k_docs: |
| |
| ``select_k_sink`` |
| ----------------- |
| |
| ``select_k_sink`` option enables selecting the top/bottom K elements, |
| similar to a SQL ``ORDER BY ... LIMIT K`` clause. |
| :class:`arrow::compute::SelectKOptions` which is a defined by |
| using :struct:`OrderBySinkNode` definition. This option returns a sink node that receives |
| inputs and then compute top_k/bottom_k. |
| |
| .. note:: This node is a "pipeline breaker" and will fully materialize the input in memory. |
| In the future, spillover mechanisms will be added which should alleviate this |
| constraint. |
| |
| SelectK example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: KSelect Example) |
| :end-before: (Doc section: KSelect Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_scan_docs: |
| |
| ``table_sink`` |
| ---------------- |
| |
| .. _stream_execution_table_sink_docs: |
| |
| The ``table_sink`` node provides the ability to receive the output as an in-memory table. |
| This is simpler to use than the other sink nodes provided by the streaming execution engine |
| but it only makes sense when the output fits comfortably in memory. |
| The node is created using :class:`arrow::compute::TableSinkNodeOptions`. |
| |
| Example of using ``table_sink`` |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Table Sink Example) |
| :end-before: (Doc section: Table Sink Example) |
| :linenos: |
| :lineno-match: |
| |
| ``scan`` |
| --------- |
| |
| ``scan`` is an operation used to load and process datasets. It should be preferred over the |
| more generic ``source`` node when your input is a dataset. The behavior is defined using |
| :class:`arrow::dataset::ScanNodeOptions`. More information on datasets and the various |
| scan options can be found in :doc:`./dataset`. |
| |
| This node is capable of applying pushdown filters to the file readers which reduce |
| the amount of data that needs to be read. This means you may supply the same |
| filter expression to the scan node that you also supply to the FilterNode because |
| the filtering is done in two different places. |
| |
| Scan example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Scan Example) |
| :end-before: (Doc section: Scan Example) |
| :linenos: |
| :lineno-match: |
| |
| |
| ``write`` |
| --------- |
| |
| The ``write`` node saves query results as a dataset of files in a |
| format like Parquet, Feather, CSV, etc. using the :doc:`./dataset` |
| functionality in Arrow. The write options are provided via the |
| :class:`arrow::dataset::WriteNodeOptions` which in turn contains |
| :class:`arrow::dataset::FileSystemDatasetWriteOptions`. |
| :class:`arrow::dataset::FileSystemDatasetWriteOptions` provides |
| control over the written dataset, including options like the output |
| directory, file naming scheme, and so on. |
| |
| Write example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Write Example) |
| :end-before: (Doc section: Write Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_union_docs: |
| |
| ``union`` |
| ------------- |
| |
| ``union`` merges multiple data streams with the same schema into one, similar to |
| a SQL ``UNION ALL`` clause. |
| |
| The following example demonstrates how this can be achieved using |
| two data sources. |
| |
| Union example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Union Example) |
| :end-before: (Doc section: Union Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_hashjoin_docs: |
| |
| ``hash_join`` |
| ------------- |
| |
| ``hash_join`` operation provides the relational algebra operation, join using hash-based |
| algorithm. :class:`arrow::compute::HashJoinNodeOptions` contains the options required in |
| defining a join. The hash_join supports |
| `left/right/full semi/anti/outerjoins |
| <https://en.wikipedia.org/wiki/Join_(SQL)>`_. |
| Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like "_x" |
| which can be appended as a suffix for column names duplicated in both left and right |
| relations.) can be set via the the join options. |
| `Read more on hash-joins |
| <https://en.wikipedia.org/wiki/Hash_join>`_. |
| |
| Hash-Join example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: HashJoin Example) |
| :end-before: (Doc section: HashJoin Example) |
| :linenos: |
| :lineno-match: |
| |
| .. _stream_execution_write_docs: |
| |
| Summary |
| ======= |
| |
| There are examples of these nodes which can be found in |
| ``cpp/examples/arrow/execution_plan_documentation_examples.cc`` in the Arrow source. |
| |
| Complete Example: |
| |
| .. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc |
| :language: cpp |
| :start-after: (Doc section: Execution Plan Documentation Example) |
| :end-before: (Doc section: Execution Plan Documentation Example) |
| :linenos: |
| :lineno-match: |