| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # --------------------------------------------------------------------- |
| # Implement Internal ExecPlan bindings |
| |
| # cython: profile=False |
| # distutils: language = c++ |
| # cython: language_level = 3 |
| |
| from cython.operator cimport dereference as deref, preincrement as inc |
| |
| from pyarrow.includes.common cimport * |
| from pyarrow.includes.libarrow cimport * |
| from pyarrow.includes.libarrow_dataset cimport * |
| from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table) |
| from pyarrow.lib import tobytes |
| from pyarrow._compute cimport Expression, _true |
| from pyarrow._dataset cimport Dataset |
| from pyarrow._dataset import InMemoryDataset |
| |
| Initialize() # Initialise support for Datasets in ExecPlan |
| |
| |
| cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True): |
| """ |
| Internal Function to create an ExecPlan and run it. |
| |
| Parameters |
| ---------- |
| inputs : list of Table or Dataset |
| The sources from which the ExecPlan should fetch data. |
| In most cases this is only one, unless the first node of the |
| plan is able to get data from multiple different sources. |
| output_type : Table or InMemoryDataset |
| In which format the output should be provided. |
| plan : vector[CDeclaration] |
| The nodes of the plan that should be applied to the sources |
| to produce the output. |
| use_threads : bool, default True |
| Whenever to use multithreading or not. |
| """ |
| cdef: |
| CExecutor *c_executor |
| shared_ptr[CExecContext] c_exec_context |
| shared_ptr[CExecPlan] c_exec_plan |
| vector[CDeclaration] c_decls |
| vector[CExecNode*] _empty |
| vector[CExecNode*] c_final_node_vec |
| CExecNode *c_node |
| CTable* c_table |
| shared_ptr[CTable] c_in_table |
| shared_ptr[CTable] c_out_table |
| shared_ptr[CTableSourceNodeOptions] c_tablesourceopts |
| shared_ptr[CScanNodeOptions] c_scanopts |
| shared_ptr[CExecNodeOptions] c_input_node_opts |
| shared_ptr[CSinkNodeOptions] c_sinkopts |
| shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen |
| shared_ptr[CRecordBatchReader] c_recordbatchreader |
| vector[CDeclaration].iterator plan_iter |
| vector[CDeclaration.Input] no_c_inputs |
| CStatus c_plan_status |
| |
| if use_threads: |
| c_executor = GetCpuThreadPool() |
| else: |
| c_executor = NULL |
| |
| c_exec_context = make_shared[CExecContext]( |
| c_default_memory_pool(), c_executor) |
| c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get())) |
| |
| plan_iter = plan.begin() |
| |
| # Create source nodes for each input |
| for ipt in inputs: |
| if isinstance(ipt, Table): |
| node_factory = "table_source" |
| c_in_table = pyarrow_unwrap_table(ipt) |
| c_tablesourceopts = make_shared[CTableSourceNodeOptions]( |
| c_in_table, 1 << 20) |
| c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions]( |
| c_tablesourceopts) |
| elif isinstance(ipt, Dataset): |
| node_factory = "scan" |
| c_in_dataset = (<Dataset>ipt).unwrap() |
| c_scanopts = make_shared[CScanNodeOptions]( |
| c_in_dataset, make_shared[CScanOptions]()) |
| deref(deref(c_scanopts).scan_options).use_threads = use_threads |
| c_input_node_opts = static_pointer_cast[CExecNodeOptions, CScanNodeOptions]( |
| c_scanopts) |
| else: |
| raise TypeError("Unsupported type") |
| |
| if plan_iter != plan.end(): |
| # Flag the source as the input of the first plan node. |
| deref(plan_iter).inputs.push_back(CDeclaration.Input( |
| CDeclaration(tobytes(node_factory), |
| no_c_inputs, c_input_node_opts) |
| )) |
| else: |
| # Empty plan, make the source the first plan node. |
| c_decls.push_back( |
| CDeclaration(tobytes(node_factory), |
| no_c_inputs, c_input_node_opts) |
| ) |
| |
| # Add Here additional nodes |
| while plan_iter != plan.end(): |
| c_decls.push_back(deref(plan_iter)) |
| inc(plan_iter) |
| |
| # Add all CDeclarations to the plan |
| c_node = GetResultValue( |
| CDeclaration.Sequence(c_decls).AddToPlan(&deref(c_exec_plan)) |
| ) |
| c_final_node_vec.push_back(c_node) |
| |
| # Create the output node |
| c_async_exec_batch_gen = make_shared[CAsyncExecBatchGenerator]() |
| c_sinkopts = make_shared[CSinkNodeOptions](c_async_exec_batch_gen.get()) |
| GetResultValue( |
| MakeExecNode(tobytes("sink"), &deref(c_exec_plan), |
| c_final_node_vec, deref(c_sinkopts)) |
| ) |
| |
| # Convert the asyncgenerator to a sync batch reader |
| c_recordbatchreader = MakeGeneratorReader(c_node.output_schema(), |
| deref(c_async_exec_batch_gen), |
| deref(c_exec_context).memory_pool()) |
| |
| # Start execution of the ExecPlan |
| deref(c_exec_plan).Validate() |
| deref(c_exec_plan).StartProducing() |
| |
| # Convert output to the expected one. |
| c_out_table = GetResultValue( |
| CTable.FromRecordBatchReader(c_recordbatchreader.get())) |
| if output_type == Table: |
| output = pyarrow_wrap_table(c_out_table) |
| elif output_type == InMemoryDataset: |
| output = InMemoryDataset(pyarrow_wrap_table(c_out_table)) |
| else: |
| raise TypeError("Unsupported output type") |
| |
| with nogil: |
| c_plan_status = deref(c_exec_plan).finished().status() |
| check_status(c_plan_status) |
| |
| return output |
| |
| |
| def _perform_join(join_type, left_operand not None, left_keys, |
| right_operand not None, right_keys, |
| left_suffix=None, right_suffix=None, |
| use_threads=True, coalesce_keys=False, |
| output_type=Table): |
| """ |
| Perform join of two tables or datasets. |
| |
| The result will be an output table with the result of the join operation |
| |
| Parameters |
| ---------- |
| join_type : str |
| One of supported join types. |
| left_operand : Table or Dataset |
| The left operand for the join operation. |
| left_keys : str or list[str] |
| The left key (or keys) on which the join operation should be performed. |
| right_operand : Table or Dataset |
| The right operand for the join operation. |
| right_keys : str or list[str] |
| The right key (or keys) on which the join operation should be performed. |
| left_suffix : str, default None |
| Which suffix to add to right column names. This prevents confusion |
| when the columns in left and right operands have colliding names. |
| right_suffix : str, default None |
| Which suffic to add to the left column names. This prevents confusion |
| when the columns in left and right operands have colliding names. |
| use_threads : bool, default True |
| Whenever to use multithreading or not. |
| coalesce_keys : bool, default False |
| If the duplicated keys should be omitted from one of the sides |
| in the join result. |
| output_type: Table or InMemoryDataset |
| The output type for the exec plan result. |
| |
| Returns |
| ------- |
| result_table : Table |
| """ |
| cdef: |
| vector[CFieldRef] c_left_keys |
| vector[CFieldRef] c_right_keys |
| vector[CFieldRef] c_left_columns |
| vector[CFieldRef] c_right_columns |
| vector[CDeclaration] c_decl_plan |
| vector[CExpression] c_projections |
| vector[c_string] c_projected_col_names |
| CJoinType c_join_type |
| |
| # Prepare left and right tables Keys to send them to the C++ function |
| left_keys_order = {} |
| if isinstance(left_keys, str): |
| left_keys = [left_keys] |
| for idx, key in enumerate(left_keys): |
| left_keys_order[key] = idx |
| c_left_keys.push_back(CFieldRef(<c_string>tobytes(key))) |
| |
| right_keys_order = {} |
| if isinstance(right_keys, str): |
| right_keys = [right_keys] |
| for idx, key in enumerate(right_keys): |
| right_keys_order[key] = idx |
| c_right_keys.push_back(CFieldRef(<c_string>tobytes(key))) |
| |
| # By default expose all columns on both left and right table |
| if isinstance(left_operand, Table): |
| left_columns = left_operand.column_names |
| elif isinstance(left_operand, Dataset): |
| left_columns = left_operand.schema.names |
| else: |
| raise TypeError("Unsupported left join member type") |
| |
| if isinstance(right_operand, Table): |
| right_columns = right_operand.column_names |
| elif isinstance(right_operand, Dataset): |
| right_columns = right_operand.schema.names |
| else: |
| raise TypeError("Unsupported right join member type") |
| |
| # Pick the join type |
| if join_type == "left semi": |
| c_join_type = CJoinType_LEFT_SEMI |
| right_columns = [] |
| elif join_type == "right semi": |
| c_join_type = CJoinType_RIGHT_SEMI |
| left_columns = [] |
| elif join_type == "left anti": |
| c_join_type = CJoinType_LEFT_ANTI |
| right_columns = [] |
| elif join_type == "right anti": |
| c_join_type = CJoinType_RIGHT_ANTI |
| left_columns = [] |
| elif join_type == "inner": |
| c_join_type = CJoinType_INNER |
| right_columns = set(right_columns) - set(right_keys) |
| elif join_type == "left outer": |
| c_join_type = CJoinType_LEFT_OUTER |
| right_columns = set(right_columns) - set(right_keys) |
| elif join_type == "right outer": |
| c_join_type = CJoinType_RIGHT_OUTER |
| left_columns = set(left_columns) - set(left_keys) |
| elif join_type == "full outer": |
| c_join_type = CJoinType_FULL_OUTER |
| else: |
| raise ValueError("Unsupported join type") |
| |
| # Turn the columns to vectors of FieldRefs |
| # and set aside indices of keys. |
| left_column_keys_indices = {} |
| for idx, colname in enumerate(left_columns): |
| c_left_columns.push_back(CFieldRef(<c_string>tobytes(colname))) |
| if colname in left_keys: |
| left_column_keys_indices[colname] = idx |
| right_column_keys_indices = {} |
| for idx, colname in enumerate(right_columns): |
| c_right_columns.push_back(CFieldRef(<c_string>tobytes(colname))) |
| if colname in right_keys: |
| right_column_keys_indices[colname] = idx |
| |
| # Add the join node to the execplan |
| if coalesce_keys: |
| c_decl_plan.push_back( |
| CDeclaration(tobytes("hashjoin"), CHashJoinNodeOptions( |
| c_join_type, c_left_keys, c_right_keys, |
| c_left_columns, c_right_columns, |
| _true, |
| <c_string>tobytes(left_suffix or ""), |
| <c_string>tobytes(right_suffix or "") |
| )) |
| ) |
| if join_type == "full outer": |
| # In case of full outer joins, the join operation will output all columns |
| # so that we can coalesce the keys and exclude duplicates in a subsequent projection. |
| left_columns_set = set(left_columns) |
| right_columns_set = set(right_columns) |
| # Where the right table columns start. |
| right_operand_index = len(left_columns) |
| for idx, col in enumerate(left_columns + right_columns): |
| if idx < len(left_columns) and col in left_column_keys_indices: |
| # Include keys only once and coalesce left+right table keys. |
| c_projected_col_names.push_back(tobytes(col)) |
| # Get the index of the right key that is being paired |
| # with this left key. We do so by retrieving the name |
| # of the right key that is in the same position in the provided keys |
| # and then looking up the index for that name in the right table. |
| right_key_index = right_column_keys_indices[right_keys[left_keys_order[col]]] |
| c_projections.push_back(Expression.unwrap( |
| Expression._call("coalesce", [ |
| Expression._field(idx), Expression._field( |
| right_operand_index+right_key_index) |
| ]) |
| )) |
| elif idx >= right_operand_index and col in right_column_keys_indices: |
| # Do not include right table keys. As they would lead to duplicated keys. |
| continue |
| else: |
| # For all the other columns incude them as they are. |
| # Just recompute the suffixes that the join produced as the projection |
| # would lose them otherwise. |
| if left_suffix and idx < right_operand_index and col in right_columns_set: |
| col += left_suffix |
| if right_suffix and idx >= right_operand_index and col in left_columns_set: |
| col += right_suffix |
| c_projected_col_names.push_back(tobytes(col)) |
| c_projections.push_back( |
| Expression.unwrap(Expression._field(idx))) |
| c_decl_plan.push_back( |
| CDeclaration(tobytes("project"), CProjectNodeOptions( |
| c_projections, c_projected_col_names)) |
| ) |
| else: |
| c_decl_plan.push_back( |
| CDeclaration(tobytes("hashjoin"), CHashJoinNodeOptions( |
| c_join_type, c_left_keys, c_right_keys, |
| _true, |
| <c_string>tobytes(left_suffix or ""), |
| <c_string>tobytes(right_suffix or "") |
| )) |
| ) |
| |
| result_table = execplan([left_operand, right_operand], |
| plan=c_decl_plan, |
| output_type=output_type, |
| use_threads=use_threads) |
| |
| return result_table |