| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # --------------------------------------------------------------------- |
| # Low-level Acero bindings |
| |
| # cython: profile=False |
| # distutils: language = c++ |
| # cython: language_level = 3 |
| |
| from pyarrow.includes.common cimport * |
| from pyarrow.includes.libarrow cimport * |
| from pyarrow.includes.libarrow_acero cimport * |
| from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table, |
| RecordBatchReader) |
| from pyarrow.lib import frombytes, tobytes |
| from pyarrow._compute cimport ( |
| Expression, FunctionOptions, _ensure_field_ref, _true, |
| unwrap_null_placement, unwrap_sort_order |
| ) |
| |
| |
| cdef class ExecNodeOptions(_Weakrefable): |
| """ |
| Base class for the node options. |
| |
| Use one of the subclasses to construct an options object. |
| """ |
| __slots__ = () # avoid mistakingly creating attributes |
| |
| cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): |
| self.wrapped = sp |
| |
| cdef inline shared_ptr[CExecNodeOptions] unwrap(self) nogil: |
| return self.wrapped |
| |
| |
| cdef class _TableSourceNodeOptions(ExecNodeOptions): |
| |
| def _set_options(self, Table table): |
| cdef: |
| shared_ptr[CTable] c_table |
| |
| c_table = pyarrow_unwrap_table(table) |
| self.wrapped.reset( |
| new CTableSourceNodeOptions(c_table) |
| ) |
| |
| |
| class TableSourceNodeOptions(_TableSourceNodeOptions): |
| """ |
| A Source node which accepts a table. |
| |
| This is the option class for the "table_source" node factory. |
| |
| Parameters |
| ---------- |
| table : pyarrow.Table |
| The table which acts as the data source. |
| """ |
| |
| def __init__(self, Table table): |
| self._set_options(table) |
| |
| |
| cdef class _FilterNodeOptions(ExecNodeOptions): |
| |
| def _set_options(self, Expression filter_expression not None): |
| self.wrapped.reset( |
| new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) |
| ) |
| |
| |
| class FilterNodeOptions(_FilterNodeOptions): |
| """ |
| Make a node which excludes some rows from batches passed through it. |
| |
| This is the option class for the "filter" node factory. |
| |
| The "filter" operation provides an option to define data filtering |
| criteria. It selects rows where the given expression evaluates to true. |
| Filters can be written using pyarrow.compute.Expression, and the |
| expression must have a return type of boolean. |
| |
| Parameters |
| ---------- |
| filter_expression : pyarrow.compute.Expression |
| """ |
| |
| def __init__(self, Expression filter_expression): |
| self._set_options(filter_expression) |
| |
| |
| cdef class _ProjectNodeOptions(ExecNodeOptions): |
| |
| def _set_options(self, expressions, names=None): |
| cdef: |
| Expression expr |
| vector[CExpression] c_expressions |
| vector[c_string] c_names |
| |
| for expr in expressions: |
| c_expressions.push_back(expr.unwrap()) |
| |
| if names is not None: |
| if len(names) != len(expressions): |
| raise ValueError( |
| "The number of names should be equal to the number of expressions" |
| ) |
| |
| for name in names: |
| c_names.push_back(<c_string>tobytes(name)) |
| |
| self.wrapped.reset( |
| new CProjectNodeOptions(c_expressions, c_names) |
| ) |
| else: |
| self.wrapped.reset( |
| new CProjectNodeOptions(c_expressions) |
| ) |
| |
| |
| class ProjectNodeOptions(_ProjectNodeOptions): |
| """ |
| Make a node which executes expressions on input batches, |
| producing batches of the same length with new columns. |
| |
| This is the option class for the "project" node factory. |
| |
| The "project" operation rearranges, deletes, transforms, and |
| creates columns. Each output column is computed by evaluating |
| an expression against the source record batch. These must be |
| scalar expressions (expressions consisting of scalar literals, |
| field references and scalar functions, i.e. elementwise functions |
| that return one value for each input row independent of the value |
| of all other rows). |
| |
| Parameters |
| ---------- |
| expressions : list of pyarrow.compute.Expression |
| List of expressions to evaluate against the source batch. This must |
| be scalar expressions. |
| names : list of str, optional |
| List of names for each of the ouptut columns (same length as |
| `expressions`). If `names` is not provided, the string |
| representations of exprs will be used. |
| """ |
| |
| def __init__(self, expressions, names=None): |
| self._set_options(expressions, names) |
| |
| |
| cdef class _AggregateNodeOptions(ExecNodeOptions): |
| |
| def _set_options(self, aggregates, keys=None): |
| cdef: |
| CAggregate c_aggr |
| vector[CAggregate] c_aggregations |
| vector[CFieldRef] c_keys |
| |
| for arg_names, func_name, opts, name in aggregates: |
| c_aggr.function = tobytes(func_name) |
| if opts is not None: |
| c_aggr.options = (<FunctionOptions?>opts).wrapped |
| else: |
| c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr |
| if not isinstance(arg_names, (list, tuple)): |
| arg_names = [arg_names] |
| for arg in arg_names: |
| c_aggr.target.push_back(_ensure_field_ref(arg)) |
| c_aggr.name = tobytes(name) |
| |
| c_aggregations.push_back(move(c_aggr)) |
| |
| if keys is None: |
| keys = [] |
| for name in keys: |
| c_keys.push_back(_ensure_field_ref(name)) |
| |
| self.wrapped.reset( |
| new CAggregateNodeOptions(c_aggregations, c_keys) |
| ) |
| |
| |
| class AggregateNodeOptions(_AggregateNodeOptions): |
| """ |
| Make a node which aggregates input batches, optionally grouped by keys. |
| |
| This is the option class for the "aggregate" node factory. |
| |
| Acero supports two types of aggregates: "scalar" aggregates, |
| and "hash" aggregates. Scalar aggregates reduce an array or scalar |
| input to a single scalar output (e.g. computing the mean of a column). |
| Hash aggregates act like GROUP BY in SQL and first partition data |
| based on one or more key columns, then reduce the data in each partition. |
| The aggregate node supports both types of computation, and can compute |
| any number of aggregations at once. |
| |
| Parameters |
| ---------- |
| aggregates : list of tuples |
| Aggregations which will be applied to the targetted fields. |
| Specified as a list of tuples, where each tuple is one aggregation |
| specification and consists of: aggregation target column(s) followed |
| by function name, aggregation function options object and the |
| output field name. |
| The target column(s) specification can be a single field reference, |
| an empty list or a list of fields unary, nullary and n-ary aggregation |
| functions respectively. Each field reference can be a string |
| column name or expression. |
| keys : list of field references, optional |
| Keys by which aggregations will be grouped. Each key can reference |
| a field using a string name or expression. |
| """ |
| |
| def __init__(self, aggregates, keys=None): |
| self._set_options(aggregates, keys) |
| |
| |
| cdef class _OrderByNodeOptions(ExecNodeOptions): |
| |
| def _set_options(self, sort_keys, null_placement): |
| cdef: |
| vector[CSortKey] c_sort_keys |
| |
| for name, order in sort_keys: |
| c_sort_keys.push_back( |
| CSortKey(_ensure_field_ref(name), unwrap_sort_order(order)) |
| ) |
| |
| self.wrapped.reset( |
| new COrderByNodeOptions( |
| COrdering(c_sort_keys, unwrap_null_placement(null_placement)) |
| ) |
| ) |
| |
| |
| class OrderByNodeOptions(_OrderByNodeOptions): |
| """ |
| Make a node which applies a new ordering to the data. |
| |
| Currently this node works by accumulating all data, sorting, and then |
| emitting the new data with an updated batch index. |
| Larger-than-memory sort is not currently supported. |
| |
| This is the option class for the "order_by" node factory. |
| |
| Parameters |
| ---------- |
| sort_keys : sequence of (name, order) tuples |
| Names of field/column keys to sort the input on, |
| along with the order each field/column is sorted in. |
| Accepted values for `order` are "ascending", "descending". |
| Each field reference can be a string column name or expression. |
| null_placement : str, default "at_end" |
| Where nulls in input should be sorted, only applying to |
| columns/fields mentioned in `sort_keys`. |
| Accepted values are "at_start", "at_end". |
| """ |
| |
| def __init__(self, sort_keys=(), *, null_placement="at_end"): |
| self._set_options(sort_keys, null_placement) |
| |
| |
| cdef class _HashJoinNodeOptions(ExecNodeOptions): |
| |
| def _set_options( |
| self, join_type, left_keys, right_keys, left_output=None, right_output=None, |
| output_suffix_for_left="", output_suffix_for_right="", |
| ): |
| cdef: |
| CJoinType c_join_type |
| vector[CFieldRef] c_left_keys |
| vector[CFieldRef] c_right_keys |
| vector[CFieldRef] c_left_output |
| vector[CFieldRef] c_right_output |
| |
| # join type |
| if join_type == "left semi": |
| c_join_type = CJoinType_LEFT_SEMI |
| elif join_type == "right semi": |
| c_join_type = CJoinType_RIGHT_SEMI |
| elif join_type == "left anti": |
| c_join_type = CJoinType_LEFT_ANTI |
| elif join_type == "right anti": |
| c_join_type = CJoinType_RIGHT_ANTI |
| elif join_type == "inner": |
| c_join_type = CJoinType_INNER |
| elif join_type == "left outer": |
| c_join_type = CJoinType_LEFT_OUTER |
| elif join_type == "right outer": |
| c_join_type = CJoinType_RIGHT_OUTER |
| elif join_type == "full outer": |
| c_join_type = CJoinType_FULL_OUTER |
| else: |
| raise ValueError("Unsupported join type") |
| |
| # left/right keys |
| if not isinstance(left_keys, (list, tuple)): |
| left_keys = [left_keys] |
| for key in left_keys: |
| c_left_keys.push_back(_ensure_field_ref(key)) |
| if not isinstance(right_keys, (list, tuple)): |
| right_keys = [right_keys] |
| for key in right_keys: |
| c_right_keys.push_back(_ensure_field_ref(key)) |
| |
| # left/right output fields |
| if left_output is not None and right_output is not None: |
| for colname in left_output: |
| c_left_output.push_back(_ensure_field_ref(colname)) |
| for colname in right_output: |
| c_right_output.push_back(_ensure_field_ref(colname)) |
| |
| self.wrapped.reset( |
| new CHashJoinNodeOptions( |
| c_join_type, c_left_keys, c_right_keys, |
| c_left_output, c_right_output, |
| _true, |
| <c_string>tobytes(output_suffix_for_left), |
| <c_string>tobytes(output_suffix_for_right) |
| ) |
| ) |
| else: |
| self.wrapped.reset( |
| new CHashJoinNodeOptions( |
| c_join_type, c_left_keys, c_right_keys, |
| _true, |
| <c_string>tobytes(output_suffix_for_left), |
| <c_string>tobytes(output_suffix_for_right) |
| ) |
| ) |
| |
| |
| class HashJoinNodeOptions(_HashJoinNodeOptions): |
| """ |
| Make a node which implements join operation using hash join strategy. |
| |
| This is the option class for the "hashjoin" node factory. |
| |
| Parameters |
| ---------- |
| join_type : str |
| Type of join. One of "left semi", "right semi", "left anti", |
| "right anti", "inner", "left outer", "right outer", "full outer". |
| left_keys : str, Expression or list |
| Key fields from left input. Each key can be a string column name |
| or a field expression, or a list of such field references. |
| right_keys : str, Expression or list |
| Key fields from right input. See `left_keys` for details. |
| left_output : list, optional |
| List of output fields passed from left input. If left and right |
| output fields are not specified, all valid fields from both left and |
| right input will be output. Each field can be a string column name |
| or a field expression. |
| right_output : list, optional |
| List of output fields passed from right input. If left and right |
| output fields are not specified, all valid fields from both left and |
| right input will be output. Each field can be a string column name |
| or a field expression. |
| output_suffix_for_left : str |
| Suffix added to names of output fields coming from left input |
| (used to distinguish, if necessary, between fields of the same |
| name in left and right input and can be left empty if there are |
| no name collisions). |
| output_suffix_for_right : str |
| Suffix added to names of output fields coming from right input, |
| see `output_suffix_for_left` for details. |
| """ |
| |
| def __init__( |
| self, join_type, left_keys, right_keys, left_output=None, right_output=None, |
| output_suffix_for_left="", output_suffix_for_right="" |
| ): |
| self._set_options( |
| join_type, left_keys, right_keys, left_output, right_output, |
| output_suffix_for_left, output_suffix_for_right |
| ) |
| |
| |
| cdef class Declaration(_Weakrefable): |
| """ |
| Helper class for declaring the nodes of an ExecPlan. |
| |
| A Declaration represents an unconstructed ExecNode, and potentially |
| more since its inputs may also be Declarations or when constructed |
| with ``from_sequence``. |
| |
| The possible ExecNodes to use are registered with a name, |
| the "factory name", and need to be specified using this name, together |
| with its corresponding ExecNodeOptions subclass. |
| |
| Parameters |
| ---------- |
| factory_name : str |
| The ExecNode factory name, such as "table_source", "filter", |
| "project" etc. See the ExecNodeOptions subclasses for the exact |
| factory names to use. |
| options : ExecNodeOptions |
| Corresponding ExecNodeOptions subclass (matching the factory name). |
| inputs : list of Declaration, optional |
| Input nodes for this declaration. Optional if the node is a source |
| node, or when the declaration gets combined later with |
| ``from_sequence``. |
| |
| Returns |
| ------- |
| Declaration |
| """ |
| cdef void init(self, const CDeclaration& c_decl): |
| self.decl = c_decl |
| |
| @staticmethod |
| cdef wrap(const CDeclaration& c_decl): |
| cdef Declaration self = Declaration.__new__(Declaration) |
| self.init(c_decl) |
| return self |
| |
| cdef inline CDeclaration unwrap(self) nogil: |
| return self.decl |
| |
| def __init__(self, factory_name, ExecNodeOptions options, inputs=None): |
| cdef: |
| c_string c_factory_name |
| CDeclaration c_decl |
| vector[CDeclaration.Input] c_inputs |
| |
| c_factory_name = tobytes(factory_name) |
| |
| if inputs is not None: |
| for ipt in inputs: |
| c_inputs.push_back( |
| CDeclaration.Input((<Declaration>ipt).unwrap()) |
| ) |
| |
| c_decl = CDeclaration(c_factory_name, c_inputs, options.unwrap()) |
| self.init(c_decl) |
| |
| @staticmethod |
| def from_sequence(decls): |
| """ |
| Convenience factory for the common case of a simple sequence of nodes. |
| |
| Each of the declarations will be appended to the inputs of the |
| subsequent declaration, and the final modified declaration will |
| be returned. |
| |
| Parameters |
| ---------- |
| decls : list of Declaration |
| |
| Returns |
| ------- |
| Declaration |
| """ |
| cdef: |
| vector[CDeclaration] c_decls |
| CDeclaration c_decl |
| |
| for decl in decls: |
| c_decls.push_back((<Declaration> decl).unwrap()) |
| |
| c_decl = CDeclaration.Sequence(c_decls) |
| return Declaration.wrap(c_decl) |
| |
| def __str__(self): |
| return frombytes(GetResultValue(DeclarationToString(self.decl))) |
| |
| def __repr__(self): |
| return "<pyarrow.acero.Declaration>\n{0}".format(str(self)) |
| |
| def to_table(self, bint use_threads=True): |
| """ |
| Run the declaration and collect the results into a table. |
| |
| This method will implicitly add a sink node to the declaration |
| to collect results into a table. It will then create an ExecPlan |
| from the declaration, start the exec plan, block until the plan |
| has finished, and return the created table. |
| |
| Parameters |
| ---------- |
| use_threads : bool, default True |
| If set to False, then all CPU work will be done on the calling |
| thread. I/O tasks will still happen on the I/O executor |
| and may be multi-threaded (but should not use significant CPU |
| resources). |
| |
| Returns |
| ------- |
| pyarrow.Table |
| """ |
| cdef: |
| shared_ptr[CTable] c_table |
| |
| with nogil: |
| c_table = GetResultValue(DeclarationToTable(self.unwrap(), use_threads)) |
| return pyarrow_wrap_table(c_table) |
| |
| def to_reader(self, bint use_threads=True): |
| """Run the declaration and return results as a RecordBatchReader. |
| |
| For details about the parameters, see `to_table`. |
| |
| Returns |
| ------- |
| pyarrow.RecordBatchReader |
| """ |
| cdef: |
| RecordBatchReader reader |
| reader = RecordBatchReader.__new__(RecordBatchReader) |
| reader.reader.reset( |
| GetResultValue(DeclarationToReader(self.unwrap(), use_threads)).release() |
| ) |
| return reader |
| |
| |
| def _group_by(table, aggregates, keys): |
| cdef: |
| shared_ptr[CTable] c_table |
| vector[CAggregate] c_aggregates |
| vector[CFieldRef] c_keys |
| CAggregate c_aggr |
| |
| c_table = (<Table> table).sp_table |
| |
| for aggr_arg_indices, aggr_func_name, aggr_opts, aggr_name in aggregates: |
| c_aggr.function = tobytes(aggr_func_name) |
| if aggr_opts is not None: |
| c_aggr.options = (<FunctionOptions?>aggr_opts).wrapped |
| else: |
| c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr |
| for field_idx in aggr_arg_indices: |
| c_aggr.target.push_back(CFieldRef(<int> field_idx)) |
| |
| c_aggr.name = tobytes(aggr_name) |
| c_aggregates.push_back(move(c_aggr)) |
| |
| for key_idx in keys: |
| c_keys.push_back(CFieldRef(<int> key_idx)) |
| |
| with nogil: |
| sp_table = GetResultValue( |
| CTableGroupBy(c_table, c_aggregates, c_keys) |
| ) |
| |
| return pyarrow_wrap_table(sp_table) |