| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <stdint.h> |
| |
| #include "common/status.h" |
| #include "operator.h" |
| #include "vec/exprs/table_function/table_function.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class RuntimeState; |
| } // namespace doris |
| |
| namespace doris::pipeline { |
| |
| class TableFunctionOperatorX; |
| class TableFunctionLocalState MOCK_REMOVE(final) : public PipelineXLocalState<> { |
| public: |
| using Parent = TableFunctionOperatorX; |
| ENABLE_FACTORY_CREATOR(TableFunctionLocalState); |
| TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent); |
| ~TableFunctionLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalStateInfo& infos) override; |
| Status open(RuntimeState* state) override; |
| Status close(RuntimeState* state) override { |
| for (auto* fn : _fns) { |
| RETURN_IF_ERROR(fn->close()); |
| } |
| RETURN_IF_ERROR(PipelineXLocalState<>::close(state)); |
| return Status::OK(); |
| } |
| void process_next_child_row(); |
| Status get_expanded_block(RuntimeState* state, vectorized::Block* output_block, bool* eos); |
| |
| private: |
| friend class TableFunctionOperatorX; |
| friend class StatefulOperatorX<TableFunctionLocalState>; |
| |
| MOCK_FUNCTION Status _clone_table_function(RuntimeState* state); |
| |
| void _copy_output_slots(std::vector<vectorized::MutableColumnPtr>& columns); |
| bool _roll_table_functions(int last_eos_idx); |
| // return: |
| // 0: all fns are eos |
| // -1: all fns are not eos |
| // >0: some of fns are eos |
| int _find_last_fn_eos_idx() const; |
| bool _is_inner_and_empty(); |
| |
| std::vector<vectorized::TableFunction*> _fns; |
| vectorized::VExprContextSPtrs _vfn_ctxs; |
| int64_t _cur_child_offset = -1; |
| std::unique_ptr<vectorized::Block> _child_block; |
| int _current_row_insert_times = 0; |
| bool _child_eos = false; |
| |
| RuntimeProfile::Counter* _init_function_timer = nullptr; |
| RuntimeProfile::Counter* _process_rows_timer = nullptr; |
| RuntimeProfile::Counter* _filter_timer = nullptr; |
| }; |
| |
| class TableFunctionOperatorX MOCK_REMOVE(final) |
| : public StatefulOperatorX<TableFunctionLocalState> { |
| public: |
| using Base = StatefulOperatorX<TableFunctionLocalState>; |
| TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
| const DescriptorTbl& descs); |
| |
| #ifdef BE_TEST |
| TableFunctionOperatorX() = default; |
| #endif |
| |
| Status init(const TPlanNode& tnode, RuntimeState* state) override; |
| Status prepare(doris::RuntimeState* state) override; |
| |
| bool need_more_input_data(RuntimeState* state) const override { |
| auto& local_state = state->get_local_state(operator_id())->cast<TableFunctionLocalState>(); |
| return !local_state._child_block->rows() && !local_state._child_eos; |
| } |
| |
| DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { |
| return {ExchangeType::PASSTHROUGH}; |
| } |
| |
| Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override { |
| auto& local_state = get_local_state(state); |
| if (input_block->rows() == 0) { |
| return Status::OK(); |
| } |
| |
| for (auto* fn : local_state._fns) { |
| SCOPED_TIMER(local_state._init_function_timer); |
| RETURN_IF_ERROR(fn->process_init(input_block, state)); |
| } |
| local_state.process_next_child_row(); |
| return Status::OK(); |
| } |
| |
| Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) const override { |
| auto& local_state = get_local_state(state); |
| RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block, eos)); |
| local_state.reached_limit(output_block, eos); |
| return Status::OK(); |
| } |
| |
| private: |
| friend class TableFunctionLocalState; |
| |
| Status _prepare_output_slot_ids(const TPlanNode& tnode); |
| |
| /* Now the output tuples for table function node is base_table_tuple + tf1 + tf2 + ... |
| But not all slots are used, the real used slots are inside table_function_node.outputSlotIds. |
| For case like explode_bitmap: |
| SELECT a2,count(*) as a3 FROM A WHERE a1 IN |
| (SELECT c1 FROM B LATERAL VIEW explode_bitmap(b1) C as c1) |
| GROUP BY a2 ORDER BY a3; |
| Actually we only need to output column c1, no need to output columns in bitmap table B. |
| Copy large bitmap columns are very expensive and slow. |
| |
| Here we check if the slot is really used, otherwise we avoid copy it and just insert a default value. |
| |
| A better solution is: |
| 1. FE: create a new output tuple based on the real output slots; |
| 2. BE: refractor (V)TableFunctionNode output rows based no the new tuple; |
| */ |
| [[nodiscard]] inline bool _slot_need_copy(SlotId slot_id) const { |
| auto id = _output_slots[slot_id]->id(); |
| return (id < _output_slot_ids.size()) && (_output_slot_ids[id]); |
| } |
| |
| std::vector<SlotDescriptor*> _child_slots; |
| std::vector<SlotDescriptor*> _output_slots; |
| |
| vectorized::VExprContextSPtrs _vfn_ctxs; |
| |
| std::vector<vectorized::TableFunction*> _fns; |
| int _fn_num = 0; |
| |
| std::vector<bool> _output_slot_ids; |
| std::vector<int> _output_slot_indexs; |
| std::vector<int> _useless_slot_indexs; |
| |
| std::vector<int> _child_slot_sizes; |
| }; |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris::pipeline |