| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/operator/table_function_operator.h" |
| |
| #include <algorithm> |
| #include <cstring> |
| #include <limits> |
| #include <memory> |
| #include <numeric> |
| |
| #include "common/cast_set.h" |
| #include "core/assert_cast.h" |
| #include "core/block/block.h" |
| #include "core/block/column_numbers.h" |
| #include "core/column/column_nullable.h" |
| #include "core/custom_allocator.h" |
| #include "exec/operator/operator.h" |
| #include "exprs/table_function/table_function_factory.h" |
| #include "util/simd/bits.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class RuntimeState; |
| } // namespace doris |
| |
| namespace doris { |
| |
| TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent) |
| : PipelineXLocalState<>(state, parent), _child_block(Block::create_unique()) {} |
| |
| Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo& info) { |
| RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); |
| SCOPED_TIMER(exec_time_counter()); |
| SCOPED_TIMER(_init_timer); |
| _init_function_timer = ADD_TIMER(custom_profile(), "InitTableFunctionTime"); |
| _process_rows_timer = ADD_TIMER(custom_profile(), "ProcessRowsTime"); |
| _filter_timer = ADD_TIMER(custom_profile(), "FilterTime"); |
| return Status::OK(); |
| } |
| |
| Status TableFunctionLocalState::_clone_table_function(RuntimeState* state) { |
| auto& p = _parent->cast<TableFunctionOperatorX>(); |
| _vfn_ctxs.resize(p._vfn_ctxs.size()); |
| for (size_t i = 0; i < _vfn_ctxs.size(); i++) { |
| RETURN_IF_ERROR(p._vfn_ctxs[i]->clone(state, _vfn_ctxs[i])); |
| |
| TableFunction* fn = nullptr; |
| RETURN_IF_ERROR(TableFunctionFactory::get_fn(_vfn_ctxs[i]->root()->fn(), state->obj_pool(), |
| &fn, state->be_exec_version())); |
| fn->set_expr_context(_vfn_ctxs[i]); |
| _fns.push_back(fn); |
| } |
| _expand_conjuncts_ctxs.resize(p._expand_conjuncts_ctxs.size()); |
| for (size_t i = 0; i < _expand_conjuncts_ctxs.size(); i++) { |
| RETURN_IF_ERROR(p._expand_conjuncts_ctxs[i]->clone(state, _expand_conjuncts_ctxs[i])); |
| } |
| _need_to_handle_outer_conjuncts = (!_expand_conjuncts_ctxs.empty() && _fns[0]->is_outer()); |
| return Status::OK(); |
| } |
| |
| Status TableFunctionLocalState::open(RuntimeState* state) { |
| SCOPED_TIMER(PipelineXLocalState<>::exec_time_counter()); |
| SCOPED_TIMER(PipelineXLocalState<>::_open_timer); |
| RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); |
| RETURN_IF_ERROR(_clone_table_function(state)); |
| for (auto* fn : _fns) { |
| RETURN_IF_ERROR(fn->open()); |
| } |
| _cur_child_offset = -1; |
| _reset_block_fast_path_state(); |
| return Status::OK(); |
| } |
| |
| void TableFunctionLocalState::_copy_output_slots(std::vector<MutableColumnPtr>& columns, |
| const TableFunctionOperatorX& p) { |
| if (!_current_row_insert_times) { |
| return; |
| } |
| for (auto index : p._output_slot_indexs) { |
| auto src_column = _child_block->get_by_position(index).column; |
| columns[index]->insert_many_from(*src_column, _cur_child_offset, _current_row_insert_times); |
| } |
| _current_row_insert_times = 0; |
| } |
| |
| // Returns the index of fn of the last eos counted from back to front |
| // eg: there are 3 functions in `_fns` |
| // eos: false, true, true |
| // return: 1 |
| // |
| // eos: false, false, true |
| // return: 2 |
| // |
| // eos: false, false, false |
| // return: -1 |
| // |
| // eos: true, true, true |
| // return: 0 |
| // |
| // return: |
| // 0: all fns are eos |
| // -1: all fns are not eos |
| // >0: some of fns are eos |
| int TableFunctionLocalState::_find_last_fn_eos_idx() const { |
| for (int i = _parent->cast<TableFunctionOperatorX>()._fn_num - 1; i >= 0; --i) { |
| if (!_fns[i]->eos()) { |
| if (i == _parent->cast<TableFunctionOperatorX>()._fn_num - 1) { |
| return -1; |
| } else { |
| return i + 1; |
| } |
| } |
| } |
| // all eos |
| return 0; |
| } |
| |
| // Roll to reset the table function. |
| // Eg: |
| // There are 3 functions f1, f2 and f3 in `_fns`. |
| // If `last_eos_idx` is 1, which means f2 and f3 are eos. |
| // So we need to forward f1, and reset f2 and f3. |
| bool TableFunctionLocalState::_roll_table_functions(int last_eos_idx) { |
| int i = last_eos_idx - 1; |
| for (; i >= 0; --i) { |
| _fns[i]->forward(); |
| if (!_fns[i]->eos()) { |
| break; |
| } |
| } |
| if (i == -1) { |
| // after forward, all functions are eos. |
| // we should process next child row to get more table function results. |
| return false; |
| } |
| |
| for (int j = i + 1; j < _parent->cast<TableFunctionOperatorX>()._fn_num; ++j) { |
| _fns[j]->reset(); |
| } |
| |
| return true; |
| } |
| |
| bool TableFunctionLocalState::_is_inner_and_empty() { |
| for (int i = 0; i < _parent->cast<TableFunctionOperatorX>()._fn_num; i++) { |
| // if any table function is not outer and has empty result, go to next child row |
| // if it's outer function, will be insert into one row NULL |
| if (!_fns[i]->is_outer() && _fns[i]->current_empty()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| bool TableFunctionLocalState::_can_use_block_fast_path() const { |
| auto& p = _parent->cast<TableFunctionOperatorX>(); |
| // Fast path is only valid when: |
| // - only one table function exists |
| // - there is an active child row to expand |
| // - the child block is non-empty |
| // - the table function can expose nested/offsets via prepare_block_fast_path() |
| return p._fn_num == 1 && _cur_child_offset != -1 && _child_block->rows() > 0 && |
| _fns[0]->support_block_fast_path(); |
| } |
| |
| void TableFunctionLocalState::_reset_block_fast_path_state() { |
| _block_fast_path_prepared = false; |
| _block_fast_path_enabled = false; |
| _block_fast_path_ctx = {}; |
| _block_fast_path_row = 0; |
| _block_fast_path_in_row_offset = 0; |
| } |
| |
| Status TableFunctionLocalState::_prepare_block_fast_path(RuntimeState* state) { |
| if (_block_fast_path_prepared) { |
| return Status::OK(); |
| } |
| |
| RETURN_IF_ERROR( |
| _fns[0]->prepare_block_fast_path(_child_block.get(), state, &_block_fast_path_ctx)); |
| if (_block_fast_path_ctx.offsets_ptr == nullptr || |
| _block_fast_path_ctx.nested_col.get() == nullptr) { |
| return Status::InternalError("block fast path context is invalid"); |
| } |
| |
| const auto child_rows = cast_set<int64_t>(_block_fast_path_ctx.offsets_ptr->size()); |
| if (child_rows != cast_set<int64_t>(_child_block->rows())) { |
| return Status::InternalError("block fast path offsets size mismatch"); |
| } |
| |
| _block_fast_path_row = _cur_child_offset; |
| _block_fast_path_in_row_offset = 0; |
| _block_fast_path_enabled = _has_contiguous_block_fast_path_suffix(); |
| _block_fast_path_prepared = true; |
| return Status::OK(); |
| } |
| |
| bool TableFunctionLocalState::_has_contiguous_block_fast_path_suffix() const { |
| const auto& offsets = *_block_fast_path_ctx.offsets_ptr; |
| const auto child_rows = cast_set<int64_t>(offsets.size()); |
| int64_t child_row = _block_fast_path_row; |
| uint64_t in_row_offset = _block_fast_path_in_row_offset; |
| uint64_t expected_next_nested_idx = 0; |
| bool found_nested_range = false; |
| |
| while (child_row < child_rows) { |
| if (_block_fast_path_ctx.array_nullmap_data && |
| _block_fast_path_ctx.array_nullmap_data[child_row]) { |
| child_row++; |
| in_row_offset = 0; |
| continue; |
| } |
| |
| const uint64_t prev_off = child_row == 0 ? 0 : offsets[child_row - 1]; |
| const uint64_t cur_off = offsets[child_row]; |
| const uint64_t nested_len = cur_off - prev_off; |
| if (in_row_offset >= nested_len) { |
| child_row++; |
| in_row_offset = 0; |
| continue; |
| } |
| |
| const uint64_t nested_start = prev_off + in_row_offset; |
| if (!found_nested_range) { |
| found_nested_range = true; |
| } else if (nested_start != expected_next_nested_idx) { |
| return false; |
| } |
| expected_next_nested_idx = cur_off; |
| child_row++; |
| in_row_offset = 0; |
| } |
| |
| return true; |
| } |
| |
| Status TableFunctionLocalState::_get_expanded_block_block_fast_path( |
| RuntimeState* state, std::vector<MutableColumnPtr>& columns) { |
| auto& p = _parent->cast<TableFunctionOperatorX>(); |
| DCHECK(_block_fast_path_prepared); |
| DCHECK(_block_fast_path_enabled); |
| |
| const auto remaining_capacity = |
| state->batch_size() - cast_set<int>(columns[p._child_slots.size()]->size()); |
| if (remaining_capacity <= 0) { |
| return Status::OK(); |
| } |
| |
| const auto& offsets = *_block_fast_path_ctx.offsets_ptr; |
| const auto child_rows = cast_set<int64_t>(offsets.size()); |
| |
| std::vector<uint32_t> row_ids; |
| row_ids.reserve(remaining_capacity); |
| uint64_t first_nested_idx = 0; |
| uint64_t expected_next_nested_idx = 0; |
| bool found_nested_range = false; |
| |
| int64_t child_row = _block_fast_path_row; |
| uint64_t in_row_offset = _block_fast_path_in_row_offset; |
| int produced_rows = 0; |
| |
| while (produced_rows < remaining_capacity && child_row < child_rows) { |
| if (_block_fast_path_ctx.array_nullmap_data && |
| _block_fast_path_ctx.array_nullmap_data[child_row]) { |
| // NULL array row: skip it here. Slow path will handle output semantics if needed. |
| child_row++; |
| in_row_offset = 0; |
| continue; |
| } |
| |
| const uint64_t prev_off = child_row == 0 ? 0 : offsets[child_row - 1]; |
| const uint64_t cur_off = offsets[child_row]; |
| const uint64_t nested_len = cur_off - prev_off; |
| |
| if (in_row_offset >= nested_len) { |
| child_row++; |
| in_row_offset = 0; |
| continue; |
| } |
| |
| const uint64_t remaining_in_row = nested_len - in_row_offset; |
| const int take_count = |
| std::min<int>(remaining_capacity - produced_rows, cast_set<int>(remaining_in_row)); |
| const uint64_t nested_start = prev_off + in_row_offset; |
| |
| DCHECK_LE(nested_start + take_count, cur_off); |
| DCHECK_LE(nested_start + take_count, _block_fast_path_ctx.nested_col->size()); |
| |
| if (!found_nested_range) { |
| found_nested_range = true; |
| first_nested_idx = nested_start; |
| expected_next_nested_idx = nested_start; |
| } |
| DCHECK_EQ(nested_start, expected_next_nested_idx); |
| |
| // Map each produced output row back to its source child row for copying non-table-function |
| // columns via insert_indices_from(). |
| for (int j = 0; j < take_count; ++j) { |
| row_ids.push_back(cast_set<uint32_t>(child_row)); |
| } |
| |
| produced_rows += take_count; |
| expected_next_nested_idx += take_count; |
| in_row_offset += take_count; |
| if (in_row_offset >= nested_len) { |
| child_row++; |
| in_row_offset = 0; |
| } |
| } |
| |
| if (produced_rows > 0) { |
| for (auto index : p._output_slot_indexs) { |
| auto src_column = _child_block->get_by_position(index).column; |
| columns[index]->insert_indices_from(*src_column, row_ids.data(), |
| row_ids.data() + produced_rows); |
| } |
| |
| auto& out_col = columns[p._child_slots.size()]; |
| if (out_col->is_nullable()) { |
| auto* out_nullable = assert_cast<ColumnNullable*>(out_col.get()); |
| out_nullable->get_nested_column_ptr()->insert_range_from( |
| *_block_fast_path_ctx.nested_col, first_nested_idx, produced_rows); |
| auto* nullmap_column = |
| assert_cast<ColumnUInt8*>(out_nullable->get_null_map_column_ptr().get()); |
| auto& nullmap_data = nullmap_column->get_data(); |
| const size_t old_size = nullmap_data.size(); |
| nullmap_data.resize(old_size + produced_rows); |
| if (_block_fast_path_ctx.nested_nullmap_data != nullptr) { |
| memcpy(nullmap_data.data() + old_size, |
| _block_fast_path_ctx.nested_nullmap_data + first_nested_idx, |
| produced_rows * sizeof(UInt8)); |
| } else { |
| memset(nullmap_data.data() + old_size, 0, produced_rows * sizeof(UInt8)); |
| } |
| } else { |
| out_col->insert_range_from(*_block_fast_path_ctx.nested_col, first_nested_idx, |
| produced_rows); |
| } |
| } |
| |
| _block_fast_path_row = child_row; |
| _block_fast_path_in_row_offset = in_row_offset; |
| _cur_child_offset = child_row >= child_rows ? -1 : child_row; |
| |
| if (child_row >= child_rows) { |
| for (TableFunction* fn : _fns) { |
| fn->process_close(); |
| } |
| _child_block->clear_column_data(_parent->cast<TableFunctionOperatorX>() |
| ._child->row_desc() |
| .num_materialized_slots()); |
| _reset_block_fast_path_state(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, Block* output_block, |
| bool* eos) { |
| SCOPED_TIMER(_process_rows_timer); |
| if (_need_to_handle_outer_conjuncts) { |
| return _get_expanded_block_for_outer_conjuncts(state, output_block, eos); |
| } |
| |
| auto& p = _parent->cast<TableFunctionOperatorX>(); |
| MutableBlock m_block = |
| VectorizedUtils::build_mutable_mem_reuse_block(output_block, p._output_slots); |
| MutableColumns& columns = m_block.mutable_columns(); |
| |
| for (int i = 0; i < p._fn_num; i++) { |
| if (columns[i + p._child_slots.size()]->is_nullable()) { |
| _fns[i]->set_nullable(); |
| } |
| } |
| |
| bool use_slow_path = true; |
| if (_can_use_block_fast_path()) { |
| RETURN_IF_CANCELLED(state); |
| RETURN_IF_ERROR(_prepare_block_fast_path(state)); |
| if (_block_fast_path_enabled) { |
| // Only use fast path when the remaining nested suffix stays contiguous for the rest of |
| // the child block. This keeps fast-path progress entirely inside local state and avoids |
| // re-synchronizing the table function cursor when a later batch would otherwise fall |
| // back to the row-wise path. |
| RETURN_IF_ERROR(_get_expanded_block_block_fast_path(state, columns)); |
| use_slow_path = false; |
| } |
| } |
| |
| if (use_slow_path) { |
| bool skip_child_row = false; |
| while (columns[p._child_slots.size()]->size() < state->batch_size()) { |
| RETURN_IF_CANCELLED(state); |
| |
| if (_child_block->rows() == 0) { |
| break; |
| } |
| int idx = _find_last_fn_eos_idx(); |
| if (idx == 0 || skip_child_row) { |
| _copy_output_slots(columns, p); |
| // all table functions' results are exhausted, process next child row. |
| process_next_child_row(); |
| if (_cur_child_offset == -1) { |
| break; |
| } |
| } else if (idx < p._fn_num && idx != -1) { |
| // some of table functions' results are exhausted. |
| if (!_roll_table_functions(idx)) { |
| // continue to process next child row. |
| continue; |
| } |
| } |
| |
| // if any table function is not outer and has empty result, go to next child row |
| if (skip_child_row = _is_inner_and_empty(); skip_child_row) { |
| continue; |
| } |
| |
| DCHECK_LE(1, p._fn_num); |
| // It may take multiple iterations of this while loop to process a child row if |
| // any table function produces a large number of rows. |
| auto repeat_times = _fns[p._fn_num - 1]->get_value( |
| columns[p._child_slots.size() + p._fn_num - 1], |
| //// It has already been checked that |
| // columns[p._child_slots.size()]->size() < state->batch_size(), |
| // so columns[p._child_slots.size()]->size() will not exceed the range of int. |
| state->batch_size() - (int)columns[p._child_slots.size()]->size()); |
| _current_row_insert_times += repeat_times; |
| for (int i = 0; i < p._fn_num - 1; i++) { |
| _fns[i]->get_same_many_values(columns[i + p._child_slots.size()], repeat_times); |
| } |
| } |
| } |
| |
| _copy_output_slots(columns, p); |
| |
| size_t row_size = columns[p._child_slots.size()]->size(); |
| for (auto index : p._useless_slot_indexs) { |
| columns[index]->insert_many_defaults(row_size - columns[index]->size()); |
| } |
| |
| { |
| SCOPED_TIMER(_filter_timer); // 3. eval conjuncts |
| RETURN_IF_ERROR(VExprContext::filter_block(_expand_conjuncts_ctxs, output_block, |
| output_block->columns())); |
| RETURN_IF_ERROR( |
| VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); |
| } |
| |
| *eos = _child_eos && _cur_child_offset == -1; |
| return Status::OK(); |
| } |
| |
| Status TableFunctionLocalState::_get_expanded_block_for_outer_conjuncts(RuntimeState* state, |
| Block* output_block, |
| bool* eos) { |
| auto& p = _parent->cast<TableFunctionOperatorX>(); |
| MutableBlock m_block = |
| VectorizedUtils::build_mutable_mem_reuse_block(output_block, p._output_slots); |
| MutableColumns& columns = m_block.mutable_columns(); |
| auto child_slot_count = p._child_slots.size(); |
| for (int i = 0; i < p._fn_num; i++) { |
| if (columns[i + child_slot_count]->is_nullable()) { |
| _fns[i]->set_nullable(); |
| } |
| } |
| |
| DorisVector<int64_t> child_row_to_output_rows_indices; |
| DorisVector<int64_t> handled_row_indices; |
| bool child_block_empty = _child_block->empty(); |
| if (!child_block_empty) { |
| child_row_to_output_rows_indices.push_back(0); |
| } |
| |
| auto batch_size = state->batch_size(); |
| auto output_row_count = columns[child_slot_count]->size(); |
| while (output_row_count < batch_size) { |
| RETURN_IF_CANCELLED(state); |
| |
| // finished handling current child block |
| if (_cur_child_offset == -1) { |
| break; |
| } |
| |
| bool skip_child_row = false; |
| while (output_row_count < batch_size) { |
| // if table function is not outer and has empty result, go to next child row |
| if (_fns[0]->eos() || skip_child_row) { |
| _copy_output_slots(columns, p); |
| if (!skip_child_row) { |
| handled_row_indices.push_back(_cur_child_offset); |
| child_row_to_output_rows_indices.push_back(output_row_count); |
| } |
| process_next_child_row(); |
| if (_cur_child_offset == -1) { |
| break; |
| } |
| } |
| if (skip_child_row = _is_inner_and_empty(); skip_child_row) { |
| _child_rows_has_output[_cur_child_offset] = true; |
| continue; |
| } |
| |
| // It may take multiple iterations of this while loop to process a child row if |
| // the table function produces a large number of rows. |
| auto repeat_times = _fns[0]->get_value(columns[child_slot_count], |
| batch_size - (int)output_row_count); |
| _current_row_insert_times += repeat_times; |
| output_row_count = columns[child_slot_count]->size(); |
| } |
| } |
| // Two scenarios the loop above will exit: |
| // 1. current child block is finished processing |
| // _cur_child_offset == -1 |
| // 2. output_block reaches batch size |
| // fn maybe or maybe not eos |
| if (output_row_count >= batch_size) { |
| _copy_output_slots(columns, p); |
| handled_row_indices.push_back(_cur_child_offset); |
| child_row_to_output_rows_indices.push_back(output_row_count); |
| if (_fns[0]->eos()) { |
| process_next_child_row(); |
| } |
| } |
| for (auto index : p._useless_slot_indexs) { |
| columns[index]->insert_many_defaults(output_row_count - columns[index]->size()); |
| } |
| output_block->set_columns(std::move(columns)); |
| |
| /** |
| Handle the outer conjuncts after unnest. Currently, only left outer is supported. |
| e.g., for the following example data, |
| select id, name, tags from items_dict_unnest_t order by id; |
| +------+---------------------+-------------------------------------------------+ |
| | id | name | tags | |
| +------+---------------------+-------------------------------------------------+ |
| | 1 | Laptop | ["Electronics", "Office", "High-End", "Laptop"] | |
| | 2 | Mechanical Keyboard | ["Electronics", "Accessories"] | |
| | 3 | Basketball | ["Sports", "Outdoor"] | |
| | 4 | Badminton Racket | ["Sports", "Equipment"] | |
| | 5 | Shirt | ["Clothing", "Office", "Shirt"] | |
| +------+---------------------+-------------------------------------------------+ |
| |
| for this query: ``` SELECT |
| id, |
| name, |
| tags, |
| t.tag |
| FROM |
| items_dict_unnest_t |
| LEFT JOIN lateral unnest(tags) AS t(tag) ON t.tag = name;``` |
| |
| after unnest, before evaluating the outer conjuncts, the result is: |
| +------+---------------------+-------------------------------------------------+--------------+ |
| | id | name | tags | unnest(tags) | |
| +------+---------------------+-------------------------------------------------+--------------+ |
| | 1 | Laptop | ["Electronics", "Office", "High-End", "Laptop"] | Electronics | |
| | 1 | Laptop | ["Electronics", "Office", "High-End", "Laptop"] | Office | |
| | 1 | Laptop | ["Electronics", "Office", "High-End", "Laptop"] | High-End | |
| | 1 | Laptop | ["Electronics", "Office", "High-End", "Laptop"] | Laptop | |
| | 2 | Mechanical Keyboard | ["Electronics", "Accessories"] | Electronics | |
| | 2 | Mechanical Keyboard | ["Electronics", "Accessories"] | Accessories | |
| | 3 | Basketball | ["Sports", "Outdoor"] | Sports | |
| | 3 | Basketball | ["Sports", "Outdoor"] | Outdoor | |
| | 4 | Badminton Racket | ["Sports", "Equipment"] | Sports | |
| | 4 | Badminton Racket | ["Sports", "Equipment"] | Equipment | |
| | 5 | Shirt | ["Clothing", "Office", "Shirt"] | Clothing | |
| | 5 | Shirt | ["Clothing", "Office", "Shirt"] | Office | |
| | 5 | Shirt | ["Clothing", "Office", "Shirt"] | Shirt | |
| +------+---------------------+-------------------------------------------------+--------------+ |
| 13 rows in set (0.47 sec) |
| |
| the vector child_row_to_output_rows_indices is used to record the mapping relationship, |
| between child row and output rows, for example: |
| child row 0 -> output rows [0, 4) |
| child row 1 -> output rows [4, 6) |
| child row 2 -> output rows [6, 8) |
| child row 3 -> output rows [8, 10) |
| child row 4 -> output rows [10, 13) |
| it's contents are: [0, 4, 6, 8, 10, 13]. |
| |
| After evaluating the left join conjuncts `t.tag = name`, |
| the content of filter is: [0, 0, 0, 1, // child row 0 |
| 0, 0, // child row 1 |
| 0, 0, // child row 2 |
| 0, 0, // child row 3 |
| 0, 0, 1 // child row 4 |
| ] |
| child rows 1, 2, 3 are all filtered out, so we need to insert one row with NULL tag value for each of them. |
| */ |
| if (!child_block_empty) { |
| IColumn::Filter filter; |
| auto column_count = output_block->columns(); |
| ColumnNumbers columns_to_filter(column_count); |
| std::iota(columns_to_filter.begin(), columns_to_filter.end(), 0); |
| RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( |
| _expand_conjuncts_ctxs, output_block, columns_to_filter, column_count, filter)); |
| size_t remain_row_count = output_block->rows(); |
| // for outer table function, need to handle those child rows which all expanded rows are filtered out |
| auto handled_child_row_count = handled_row_indices.size(); |
| if (remain_row_count < output_row_count) { |
| for (size_t i = 0; i < handled_child_row_count; ++i) { |
| auto start_row_idx = child_row_to_output_rows_indices[i]; |
| auto end_row_idx = child_row_to_output_rows_indices[i + 1]; |
| if (simd::contain_one((uint8_t*)filter.data() + start_row_idx, |
| end_row_idx - start_row_idx)) { |
| _child_rows_has_output[handled_row_indices[i]] = true; |
| } |
| } |
| } else { |
| for (auto row_idx : handled_row_indices) { |
| _child_rows_has_output[row_idx] = true; |
| } |
| } |
| |
| if (-1 == _cur_child_offset) { |
| // Finished handling current child block, |
| auto child_block_row_count = _child_block->rows(); |
| DorisVector<uint32_t> null_row_indices; |
| for (uint32_t i = 0; i != child_block_row_count; i++) { |
| if (!_child_rows_has_output[i]) { |
| null_row_indices.push_back(i); |
| } |
| } |
| if (!null_row_indices.empty()) { |
| MutableBlock m_block2 = VectorizedUtils::build_mutable_mem_reuse_block( |
| output_block, p._output_slots); |
| MutableColumns& columns2 = m_block2.mutable_columns(); |
| for (auto index : p._output_slot_indexs) { |
| auto src_column = _child_block->get_by_position(index).column; |
| columns2[index]->insert_indices_from( |
| *src_column, null_row_indices.data(), |
| null_row_indices.data() + null_row_indices.size()); |
| } |
| for (auto index : p._useless_slot_indexs) { |
| columns2[index]->insert_many_defaults(null_row_indices.size()); |
| } |
| columns2[child_slot_count]->insert_many_defaults(null_row_indices.size()); |
| output_block->set_columns(std::move(columns2)); |
| } |
| _child_rows_has_output.clear(); |
| _child_block->clear_column_data(_parent->cast<TableFunctionOperatorX>() |
| ._child->row_desc() |
| .num_materialized_slots()); |
| } |
| } |
| |
| { |
| SCOPED_TIMER(_filter_timer); // 3. eval conjuncts |
| RETURN_IF_ERROR( |
| VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); |
| } |
| |
| *eos = _child_eos && _cur_child_offset == -1; |
| return Status::OK(); |
| } |
| |
| void TableFunctionLocalState::process_next_child_row() { |
| _cur_child_offset++; |
| |
| if (_cur_child_offset >= _child_block->rows()) { |
| // release block use count. |
| for (TableFunction* fn : _fns) { |
| fn->process_close(); |
| } |
| |
| // if there are any _expand_conjuncts_ctxs and it's outer, don't clear child block here, |
| // because we still need _child_block to output NULL rows for outer table function |
| if (!_need_to_handle_outer_conjuncts) { |
| _child_block->clear_column_data(_parent->cast<TableFunctionOperatorX>() |
| ._child->row_desc() |
| .num_materialized_slots()); |
| } |
| _cur_child_offset = -1; |
| _reset_block_fast_path_state(); |
| return; |
| } |
| |
| for (TableFunction* fn : _fns) { |
| fn->process_row(_cur_child_offset); |
| } |
| } |
| |
| TableFunctionOperatorX::TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, |
| int operator_id, const DescriptorTbl& descs) |
| : Base(pool, tnode, operator_id, descs) {} |
| |
| Status TableFunctionOperatorX::_prepare_output_slot_ids(const TPlanNode& tnode) { |
| // Prepare output slot ids |
| SlotId max_id = -1; |
| for (auto slot_id : tnode.table_function_node.outputSlotIds) { |
| if (slot_id > max_id) { |
| max_id = slot_id; |
| } |
| } |
| _output_slot_ids = std::vector<bool>(max_id + 1, false); |
| for (auto slot_id : tnode.table_function_node.outputSlotIds) { |
| _output_slot_ids[slot_id] = true; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { |
| RETURN_IF_ERROR(Base::init(tnode, state)); |
| |
| for (const TExpr& texpr : tnode.table_function_node.fnCallExprList) { |
| VExprContextSPtr ctx; |
| RETURN_IF_ERROR(VExpr::create_expr_tree(texpr, ctx)); |
| _vfn_ctxs.push_back(ctx); |
| |
| auto root = ctx->root(); |
| TableFunction* fn = nullptr; |
| RETURN_IF_ERROR( |
| TableFunctionFactory::get_fn(root->fn(), _pool, &fn, state->be_exec_version())); |
| fn->set_expr_context(ctx); |
| _fns.push_back(fn); |
| } |
| _fn_num = cast_set<int>(_fns.size()); |
| |
| for (const TExpr& texpr : tnode.table_function_node.expand_conjuncts) { |
| VExprContextSPtr ctx; |
| RETURN_IF_ERROR(VExpr::create_expr_tree(texpr, ctx)); |
| _expand_conjuncts_ctxs.push_back(ctx); |
| } |
| if (!_expand_conjuncts_ctxs.empty()) { |
| DCHECK(1 == _fn_num) << "Only support one table function when there are expand conjuncts."; |
| } |
| |
| // Prepare output slot ids |
| RETURN_IF_ERROR(_prepare_output_slot_ids(tnode)); |
| return Status::OK(); |
| } |
| |
| Status TableFunctionOperatorX::prepare(doris::RuntimeState* state) { |
| RETURN_IF_ERROR(Base::prepare(state)); |
| for (auto* fn : _fns) { |
| RETURN_IF_ERROR(fn->prepare()); |
| } |
| RETURN_IF_ERROR(VExpr::prepare(_vfn_ctxs, state, row_descriptor())); |
| |
| RETURN_IF_ERROR(VExpr::prepare(_expand_conjuncts_ctxs, state, row_descriptor())); |
| |
| // get current all output slots |
| for (const auto& tuple_desc : row_descriptor().tuple_descriptors()) { |
| for (const auto& slot_desc : tuple_desc->slots()) { |
| _output_slots.push_back(slot_desc); |
| } |
| } |
| |
| // get all input slots |
| for (const auto& child_tuple_desc : _child->row_desc().tuple_descriptors()) { |
| for (const auto& child_slot_desc : child_tuple_desc->slots()) { |
| _child_slots.push_back(child_slot_desc); |
| } |
| } |
| |
| for (int i = 0; i < _child_slots.size(); i++) { |
| if (_slot_need_copy(i)) { |
| _output_slot_indexs.push_back(i); |
| } else { |
| _useless_slot_indexs.push_back(i); |
| } |
| } |
| |
| RETURN_IF_ERROR(VExpr::open(_expand_conjuncts_ctxs, state)); |
| return VExpr::open(_vfn_ctxs, state); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |