blob: 1331a9e38ae2c77420fec260a31e947d56910c3b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "distinct_streaming_aggregation_operator.h"
#include <gen_cpp/Metrics_types.h>
#include <memory>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "vec/exprs/vectorized_agg_fn.h"
namespace doris {
class ExecNode;
class RuntimeState;
} // namespace doris
namespace doris::pipeline {
#include "common/compile_check_begin.h"
struct StreamingHtMinReductionEntry {
// Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in
// bytes is greater than this threshold.
int min_ht_mem;
// The minimum reduction factor to expand the hash tables.
double streaming_ht_min_reduction;
};
// TODO: experimentally tune these values and also programmatically get the cache size
// of the machine that we're running on.
static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
// Expand up to L2 cache always.
{.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0},
// Expand into L3 cache if we look like we're getting some reduction.
// At present, The L2 cache is generally 1024k or more
{.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1},
// Expand into main memory if we're getting a significant reduction.
// The L3 cache is generally 16MB or more
{.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0},
};
static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: PipelineXLocalState<FakeSharedState>(state, parent),
batch_size(state->batch_size()),
_agg_arena_pool(std::make_unique<vectorized::Arena>()),
_agg_data(std::make_unique<DistinctDataVariants>()),
_agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_aggregated_block(vectorized::Block::create_unique()) {}
Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_init_timer);
_build_timer = ADD_TIMER(Base::custom_profile(), "BuildTime");
_hash_table_compute_timer = ADD_TIMER(Base::custom_profile(), "HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime");
_hash_table_input_counter =
ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT);
_hash_table_size_counter = ADD_COUNTER(custom_profile(), "HashTableSize", TUnit::UNIT);
_insert_keys_to_column_timer = ADD_TIMER(custom_profile(), "InsertKeysToColumnTime");
return Status::OK();
}
Status DistinctStreamingAggLocalState::open(RuntimeState* state) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = Base::_parent->template cast<DistinctStreamingAggOperatorX>();
_probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i]));
}
RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
return Status::OK();
}
bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
if (!_should_expand_hash_table) {
return false;
}
return std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> bool {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
return false;
},
[&](auto& agg_method) -> bool {
auto& hash_tbl = *agg_method.hash_table;
auto [ht_mem, ht_rows] =
std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()};
// Need some rows in tables to have valid statistics.
if (ht_rows == 0) {
return true;
}
// Find the appropriate reduction factor in our table for the current hash table sizes.
int cache_level = 0;
while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE &&
ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) {
++cache_level;
}
// Compare the number of rows in the hash table with the number of input rows that
// were aggregated into it. Exclude passed through rows from this calculation since
// they were not in hash tables.
const int64_t input_rows = _input_num_rows;
const int64_t aggregated_input_rows = input_rows - _num_rows_returned;
// TODO chenhao
// const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_;
double current_reduction = static_cast<double>(aggregated_input_rows) /
static_cast<double>(ht_rows);
// TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be
// inaccurate, which could lead to a divide by zero below.
if (aggregated_input_rows <= 0) {
return true;
}
// Extrapolate the current reduction factor (r) using the formula
// R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data
// set, N is the number of input rows, excluding passed-through rows, and n is the
// number of rows inserted or merged into the hash tables. This is a very rough
// approximation but is good enough to be useful.
// TODO: consider collecting more statistics to better estimate reduction.
// double estimated_reduction = aggregated_input_rows >= expected_input_rows
// ? current_reduction
// : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1);
double min_reduction =
STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction;
// COUNTER_SET(preagg_estimated_reduction_, estimated_reduction);
// COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction);
// return estimated_reduction > min_reduction;
_should_expand_hash_table = current_reduction > min_reduction;
return _should_expand_hash_table;
}},
_agg_data->method_variant);
}
Status DistinctStreamingAggLocalState::_init_hash_method(
const vectorized::VExprContextSPtrs& probe_exprs) {
RETURN_IF_ERROR(init_hash_method<DistinctDataVariants>(
_agg_data.get(), get_data_types(probe_exprs),
Base::_parent->template cast<DistinctStreamingAggOperatorX>()._is_first_phase));
return Status::OK();
}
Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) {
SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());
size_t key_size = _probe_expr_ctxs.size();
vectorized::ColumnRawPtrs key_columns(key_size);
std::vector<int> result_idxs(key_size);
{
SCOPED_TIMER(_expr_timer);
for (size_t i = 0; i < key_size; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, &result_column_id));
in_block->get_by_position(result_column_id).column =
in_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
key_columns[i]->assume_mutable()->replace_float_special_values();
result_idxs[i] = result_column_id;
}
}
const uint32_t rows = (uint32_t)in_block->rows();
_distinct_row.clear();
if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg && low_memory_mode()) {
_stop_emplace_flag = true;
}
if (!_stop_emplace_flag) {
// _distinct_row is used to calculate non-duplicate data in key_columns
// _emplace_into_hash_table_to_distinct will determine whether to continue inserting data into the hashmap
// If it decides not to insert data, it will set _stop_emplace_flag = true and _distinct_row will be empty
_distinct_row.reserve(rows);
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
DCHECK_LE(_distinct_row.size(), rows)
<< "_distinct_row size should be less than or equal to rows";
}
bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
out_block->mem_reuse();
SCOPED_TIMER(_insert_keys_to_column_timer);
if (mem_reuse) {
if (_stop_emplace_flag && !out_block->empty()) {
// when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block
// need output those data firstly
DCHECK(_distinct_row.empty());
_distinct_row.resize(rows);
std::iota(_distinct_row.begin(), _distinct_row.end(), 0);
}
DCHECK_EQ(out_block->columns(), key_size);
if (_stop_emplace_flag && _distinct_row.empty()) {
// If _stop_emplace_flag is true and _distinct_row is also empty, it means it is in streaming mode, outputting what is input
// swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1)
for (int i = 0; i < key_size; ++i) {
auto output_column = out_block->get_by_position(i).column;
out_block->replace_by_position(i, key_columns[i]->assume_mutable());
in_block->replace_by_position(result_idxs[i], output_column);
}
} else {
DCHECK_EQ(_cache_block.rows(), 0);
// is output row > batch_size, split some to cache_block
if (out_block->rows() + _distinct_row.size() > batch_size) {
size_t split_size = batch_size - out_block->rows();
for (int i = 0; i < key_size; ++i) {
auto output_dst = out_block->get_by_position(i).column->assume_mutable();
key_columns[i]->append_data_by_selector(output_dst, _distinct_row, 0,
split_size);
auto cache_dst = _cache_block.get_by_position(i).column->assume_mutable();
key_columns[i]->append_data_by_selector(cache_dst, _distinct_row, split_size,
_distinct_row.size());
}
} else {
for (int i = 0; i < key_size; ++i) {
auto output_column = out_block->get_by_position(i).column;
auto dst = output_column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
}
}
}
} else {
DCHECK(out_block->empty()) << "out_block must be empty , but rows is " << out_block->rows();
vectorized::ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
if (_stop_emplace_flag) {
columns_with_schema.emplace_back(key_columns[i]->assume_mutable(),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
} else {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(std::move(distinct_column),
_probe_expr_ctxs[i]->root()->data_type(),
_probe_expr_ctxs[i]->root()->expr_name());
}
}
out_block->swap(vectorized::Block(columns_with_schema));
_cache_block = out_block->clone_empty();
if (_stop_emplace_flag) {
in_block->clear(); // clear the column ref with stop_emplace_flag = true
}
}
return Status::OK();
}
void DistinctStreamingAggLocalState::_make_nullable_output_key(vectorized::Block* block) {
if (block->rows() != 0) {
for (auto cid : Base::_parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys) {
block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column);
block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
}
}
}
void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns,
const uint32_t num_rows) {
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
auto& hash_tbl = *agg_method.hash_table;
if (_parent->cast<DistinctStreamingAggOperatorX>()._is_streaming_preagg &&
hash_tbl.add_elem_size_overflow(num_rows)) {
if (!_should_expand_preagg_hash_tables()) {
_stop_emplace_flag = true;
return;
}
}
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);
size_t row = 0;
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin, _arena);
ctor(key);
distinct_row.push_back(row);
};
auto creator_for_null_key = [&]() { distinct_row.push_back(row); };
SCOPED_TIMER(_hash_table_emplace_timer);
for (; row < num_rows; ++row) {
agg_method.lazy_emplace(state, row, creator, creator_for_null_key);
}
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
}},
_agg_data->method_variant);
}
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs,
bool require_bucket_distribution)
: StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, operator_id, descs),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
}
} else {
_is_streaming_preagg = false;
}
}
Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::init(tnode, state));
// ignore return status for now , so we need to introduce ExecNode::init()
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_trees(tnode.agg_node.grouping_exprs, _probe_expr_ctxs));
_op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR";
return Status::OK();
}
Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
init_make_nullable(state);
return Status::OK();
}
void DistinctStreamingAggOperatorX::init_make_nullable(RuntimeState* state) {
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable();
auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
if (nullable_output != nullable_input) {
DCHECK(nullable_output);
_make_nullable_keys.emplace_back(i);
}
}
}
Status DistinctStreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block,
bool eos) const {
auto& local_state = get_local_state(state);
local_state._input_num_rows += in_block->rows();
if (in_block->rows() == 0) {
return Status::OK();
}
RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key(
in_block, local_state._aggregated_block.get()));
// set limit and reach limit
if (_limit != -1 &&
(local_state._num_rows_returned + local_state._aggregated_block->rows()) > _limit) {
auto limit_rows = _limit - local_state._num_rows_returned;
local_state._aggregated_block->set_num_rows(limit_rows);
local_state._reach_limit = true;
}
return Status::OK();
}
Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block,
bool* eos) const {
auto& local_state = get_local_state(state);
if (!local_state._aggregated_block->empty()) {
block->swap(*local_state._aggregated_block);
local_state._aggregated_block->clear_column_data(block->columns());
// The cache block may have additional data due to exceeding the batch size.
if (!local_state._cache_block.empty()) {
local_state._swap_cache_block(local_state._aggregated_block.get());
}
}
local_state._make_nullable_output_key(block);
if (!_is_streaming_preagg) {
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns()));
}
local_state.add_num_rows_returned(block->rows());
// If the limit is not reached, it is important to ensure that _aggregated_block is empty
// because it may still contain data.
// However, if the limit is reached, there is no need to output data even if some exists.
*eos = (local_state._child_eos && local_state._aggregated_block->empty()) ||
(local_state._reach_limit);
return Status::OK();
}
bool DistinctStreamingAggOperatorX::need_more_input_data(RuntimeState* state) const {
auto& local_state = get_local_state(state);
const bool need_batch = local_state._stop_emplace_flag
? local_state._aggregated_block->empty()
: local_state._aggregated_block->rows() < state->batch_size();
return need_batch && !(local_state._child_eos || local_state._reach_limit);
}
Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_close_timer);
/// _hash_table_size_counter may be null if prepare failed.
if (_hash_table_size_counter && !_probe_expr_ctxs.empty()) {
std::visit(vectorized::Overload {[&](std::monostate& arg) {
// Do nothing
},
[&](auto& agg_method) {
COUNTER_SET(_hash_table_size_counter,
int64_t(agg_method.hash_table->size()));
}},
_agg_data->method_variant);
}
if (Base::_closed) {
return Status::OK();
}
_aggregated_block->clear();
// If the limit is reached, there may still be remaining data in the cache block.
// If the limit is not reached, the cache block must be empty.
// If the query is canceled, it might not satisfy the above conditions.
if (!state->is_cancelled()) {
if (!_reach_limit && !_cache_block.empty()) {
LOG_WARNING("If the limit is not reached, the cache block must be empty.");
}
}
_cache_block.clear();
return Base::close(state);
}
} // namespace doris::pipeline