blob: 8c26c1ad79eb3e2388e391ed6658ee3fcd0c1e62 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "common/global_types.h"
#include "common/status.h"
#include "exec/exec_node.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/common/arena.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
class DescriptorTbl;
class ObjectPool;
class RuntimeState;
class TupleDescriptor;
} // namespace doris
namespace doris::vectorized {
struct BlockRowPos {
BlockRowPos() : block_num(0), row_num(0), pos(0) {}
int64_t block_num; //the pos at which block
int64_t row_num; //the pos at which row
int64_t pos; //pos = all blocks size + row_num
std::string debug_string() {
std::string res = "\t block_num: ";
res += std::to_string(block_num);
res += "\t row_num: ";
res += std::to_string(row_num);
res += "\t pos: ";
res += std::to_string(pos);
return res;
}
};
class AggFnEvaluator;
enum AnalyticFnScope { PARTITION, RANGE, ROWS };
class VAnalyticEvalNode : public ExecNode {
public:
~VAnalyticEvalNode() override = default;
VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Status close(RuntimeState* state) override;
Status alloc_resource(RuntimeState* state) override;
void release_resource(RuntimeState* state) override;
Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override;
Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
bool can_read();
bool can_write();
protected:
using ExecNode::debug_string;
virtual std::string debug_string();
private:
Status _get_next_for_rows(size_t rows);
Status _get_next_for_range(size_t rows);
Status _get_next_for_partition(size_t rows);
void _execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end);
Status _reset_agg_status();
Status _init_result_columns();
Status _create_agg_status();
Status _destroy_agg_status();
Status _insert_range_column(vectorized::Block* block, const VExprContextSPtr& expr,
IColumn* dst_column, size_t length);
void _update_order_by_range();
bool _init_next_partition(BlockRowPos found_partition_end);
void _insert_result_info(int64_t current_block_rows);
Status _output_current_block(Block* block);
BlockRowPos _get_partition_by_end();
BlockRowPos _compare_row_to_find_end(int idx, BlockRowPos start, BlockRowPos end,
bool need_check_first = false);
Status _fetch_next_block_data(RuntimeState* state);
Status _consumed_block_and_init_partition(RuntimeState* state, bool* next_partition, bool* eos);
bool whether_need_next_partition(BlockRowPos found_partition_end);
std::string debug_window_bound_string(TAnalyticWindowBoundary b);
using vectorized_execute = std::function<void(int64_t peer_group_start, int64_t peer_group_end,
int64_t frame_start, int64_t frame_end)>;
using vectorized_get_next = std::function<Status(size_t rows)>;
using vectorized_get_result = std::function<void(int64_t current_block_rows)>;
using vectorized_closer = std::function<void()>;
struct executor {
vectorized_execute execute;
vectorized_get_next get_next;
vectorized_get_result insert_result;
vectorized_closer close;
};
executor _executor;
void _release_mem();
private:
std::vector<Block> _input_blocks;
std::vector<int64_t> input_block_first_row_positions;
std::vector<AggFnEvaluator*> _agg_functions;
std::vector<VExprContextSPtrs> _agg_expr_ctxs;
VExprContextSPtrs _partition_by_eq_expr_ctxs;
VExprContextSPtrs _order_by_eq_expr_ctxs;
std::vector<std::vector<MutableColumnPtr>> _agg_intput_columns;
std::vector<MutableColumnPtr> _result_window_columns;
BlockRowPos _order_by_start;
BlockRowPos _order_by_end;
BlockRowPos _partition_by_start;
BlockRowPos _partition_by_end;
BlockRowPos _all_block_end;
std::vector<int64_t> _ordey_by_column_idxs;
std::vector<int64_t> _partition_by_column_idxs;
bool _input_eos = false;
bool _next_partition = false;
std::atomic_bool _need_more_input = true;
BlockRowPos _found_partition_end;
int64_t _input_total_rows = 0;
int64_t _output_block_index = 0;
int64_t _window_end_position = 0;
int64_t _current_row_position = 0;
int64_t _rows_start_offset = 0;
int64_t _rows_end_offset = 0;
size_t _agg_functions_size = 0;
bool _agg_functions_created = false;
bool _current_window_empty = false;
/// The offset of the n-th functions.
std::vector<size_t> _offsets_of_aggregate_states;
/// The total size of the row from the functions.
size_t _total_size_of_aggregate_states = 0;
/// The max align size for functions
size_t _align_aggregate_states = 1;
std::unique_ptr<Arena> _agg_arena_pool;
AggregateDataPtr _fn_place_ptr;
TTupleId _buffered_tuple_id = 0;
TupleId _intermediate_tuple_id;
TupleId _output_tuple_id;
TAnalyticWindow _window;
AnalyticFnScope _fn_scope;
TupleDescriptor* _intermediate_tuple_desc;
TupleDescriptor* _output_tuple_desc;
std::vector<int64_t> _origin_cols;
RuntimeProfile::Counter* _evaluation_timer;
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
std::vector<bool> _change_to_nullable_flags;
};
} // namespace doris::vectorized