blob: 24e13d5c0ec4ca5fd698a43d14ae32ad4244ab05 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "aggregation_source_operator.h"
#include <memory>
#include <string>
#include "common/exception.h"
#include "pipeline/exec/operator.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}
Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_get_results_timer = ADD_TIMER(custom_profile(), "GetResultsTime");
_hash_table_iterate_timer = ADD_TIMER(custom_profile(), "HashTableIterateTime");
_insert_keys_to_column_timer = ADD_TIMER(custom_profile(), "InsertKeysToColumnTime");
_insert_values_to_column_timer = ADD_TIMER(custom_profile(), "InsertValuesToColumnTime");
_merge_timer = ADD_TIMER(Base::custom_profile(), "MergeTime");
_deserialize_data_timer = ADD_TIMER(Base::custom_profile(), "DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(Base::custom_profile(), "HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime");
_hash_table_input_counter =
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT, 1);
_hash_table_memory_usage =
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "MemoryUsageHashTable", TUnit::BYTES, 1);
_hash_table_size_counter =
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "HashTableSize", TUnit::UNIT, 1);
_memory_usage_container = ADD_COUNTER(custom_profile(), "MemoryUsageContainer", TUnit::BYTES);
_memory_usage_arena = ADD_COUNTER(custom_profile(), "MemoryUsageArena", TUnit::BYTES);
auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._without_key) {
if (p._needs_finalize) {
_executor.get_result = [this](RuntimeState* state, vectorized::Block* block,
bool* eos) {
return _get_without_key_result(state, block, eos);
};
} else {
_executor.get_result = [this](RuntimeState* state, vectorized::Block* block,
bool* eos) {
return _get_results_without_key(state, block, eos);
};
}
} else {
if (p._needs_finalize) {
_executor.get_result = [this](RuntimeState* state, vectorized::Block* block,
bool* eos) {
return _get_with_serialized_key_result(state, block, eos);
};
} else {
_executor.get_result = [this](RuntimeState* state, vectorized::Block* block,
bool* eos) {
return _get_results_with_serialized_key(state, block, eos);
};
}
}
return Status::OK();
}
Status AggLocalState::_create_agg_status(vectorized::AggregateDataPtr data) {
auto& shared_state = *Base::_shared_state;
for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
try {
shared_state.aggregate_evaluators[i]->create(
data + shared_state.offsets_of_aggregate_states[i]);
} catch (...) {
for (int j = 0; j < i; ++j) {
shared_state.aggregate_evaluators[j]->destroy(
data + shared_state.offsets_of_aggregate_states[j]);
}
throw;
}
}
return Status::OK();
}
Status AggLocalState::_get_results_with_serialized_key(RuntimeState* state,
vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_get_results_timer);
auto& shared_state = *_shared_state;
size_t key_size = _shared_state->probe_expr_ctxs.size();
size_t agg_size = _shared_state->aggregate_evaluators.size();
vectorized::MutableColumns value_columns(agg_size);
vectorized::DataTypes value_data_types(agg_size);
// non-nullable column(id in `_make_nullable_keys`) will be converted to nullable.
bool mem_reuse = shared_state.make_nullable_keys.empty() && block->mem_reuse();
vectorized::MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
if (mem_reuse) {
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
} else {
key_columns.emplace_back(
shared_state.probe_expr_ctxs[i]->root()->data_type()->create_column());
}
}
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
agg_method.init_iterator();
auto& data = *agg_method.hash_table;
const auto size = std::min(data.size(), size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(agg_method)>::Key;
std::vector<KeyType> keys(size);
if (shared_state.values.size() < size + 1) {
shared_state.values.resize(size + 1);
}
uint32_t num_rows = 0;
shared_state.aggregate_data_container->init_once();
auto& iter = shared_state.aggregate_data_container->iterator;
{
SCOPED_TIMER(_hash_table_iterate_timer);
while (iter != shared_state.aggregate_data_container->end() &&
num_rows < state->batch_size()) {
keys[num_rows] = iter.template get_key<KeyType>();
shared_state.values[num_rows] = iter.get_aggregate_data();
++iter;
++num_rows;
}
}
{
SCOPED_TIMER(_insert_keys_to_column_timer);
agg_method.insert_keys_into_columns(keys, key_columns, num_rows);
}
if (iter == shared_state.aggregate_data_container->end()) {
if (agg_method.hash_table->has_null_key_data()) {
// only one key of group by support wrap null key
// here need additional processing logic on the null key / value
DCHECK(key_columns.size() == 1);
DCHECK(key_columns[0]->is_nullable());
if (agg_method.hash_table->has_null_key_data()) {
key_columns[0]->insert_data(nullptr, 0);
shared_state.values[num_rows] =
agg_method.hash_table->template get_null_key_data<
vectorized::AggregateDataPtr>();
++num_rows;
*eos = true;
}
} else {
*eos = true;
}
}
{
SCOPED_TIMER(_insert_values_to_column_timer);
for (size_t i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
value_data_types[i] = shared_state.aggregate_evaluators[i]
->function()
->get_serialized_type();
if (mem_reuse) {
value_columns[i] =
std::move(*block->get_by_position(i + key_size).column)
.mutate();
} else {
value_columns[i] = shared_state.aggregate_evaluators[i]
->function()
->create_serialize_column();
}
shared_state.aggregate_evaluators[i]
->function()
->serialize_to_column(
shared_state.values,
shared_state.offsets_of_aggregate_states[i],
value_columns[i], num_rows);
}
}
}},
shared_state.agg_data->method_variant);
if (!mem_reuse) {
vectorized::ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
columns_with_schema.emplace_back(std::move(key_columns[i]),
shared_state.probe_expr_ctxs[i]->root()->data_type(),
shared_state.probe_expr_ctxs[i]->root()->expr_name());
}
for (int i = 0; i < agg_size; ++i) {
columns_with_schema.emplace_back(std::move(value_columns[i]), value_data_types[i], "");
}
*block = vectorized::Block(columns_with_schema);
}
return Status::OK();
}
Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& shared_state = *_shared_state;
// non-nullable column(id in `_make_nullable_keys`) will be converted to nullable.
bool mem_reuse = shared_state.make_nullable_keys.empty() && block->mem_reuse();
auto columns_with_schema = vectorized::VectorizedUtils::create_columns_with_type_and_name(
_parent->cast<AggSourceOperatorX>().row_descriptor());
size_t key_size = shared_state.probe_expr_ctxs.size();
vectorized::MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
if (!mem_reuse) {
key_columns.emplace_back(columns_with_schema[i].type->create_column());
} else {
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
}
}
vectorized::MutableColumns value_columns;
for (size_t i = key_size; i < columns_with_schema.size(); ++i) {
if (!mem_reuse) {
value_columns.emplace_back(columns_with_schema[i].type->create_column());
} else {
value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
}
}
SCOPED_TIMER(_get_results_timer);
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
agg_method.init_iterator();
const auto size = std::min(data.size(), size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(agg_method)>::Key;
std::vector<KeyType> keys(size);
if (shared_state.values.size() < size) {
shared_state.values.resize(size);
}
uint32_t num_rows = 0;
shared_state.aggregate_data_container->init_once();
auto& iter = shared_state.aggregate_data_container->iterator;
{
SCOPED_TIMER(_hash_table_iterate_timer);
while (iter != shared_state.aggregate_data_container->end() &&
num_rows < state->batch_size()) {
keys[num_rows] = iter.template get_key<KeyType>();
shared_state.values[num_rows] = iter.get_aggregate_data();
++iter;
++num_rows;
}
}
{
SCOPED_TIMER(_insert_keys_to_column_timer);
agg_method.insert_keys_into_columns(keys, key_columns, num_rows);
}
for (size_t i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
shared_state.aggregate_evaluators[i]->insert_result_info_vec(
shared_state.values,
shared_state.offsets_of_aggregate_states[i],
value_columns[i].get(), num_rows);
}
if (iter == shared_state.aggregate_data_container->end()) {
if (agg_method.hash_table->has_null_key_data()) {
// only one key of group by support wrap null key
// here need additional processing logic on the null key / value
DCHECK(key_columns.size() == 1);
DCHECK(key_columns[0]->is_nullable());
if (key_columns[0]->size() < state->batch_size()) {
key_columns[0]->insert_data(nullptr, 0);
auto mapped = agg_method.hash_table->template get_null_key_data<
vectorized::AggregateDataPtr>();
for (size_t i = 0; i < shared_state.aggregate_evaluators.size();
++i)
shared_state.aggregate_evaluators[i]->insert_result_info(
mapped +
shared_state.offsets_of_aggregate_states[i],
value_columns[i].get());
*eos = true;
}
} else {
*eos = true;
}
}
}},
shared_state.agg_data->method_variant);
if (!mem_reuse) {
*block = columns_with_schema;
vectorized::MutableColumns columns(block->columns());
for (int i = 0; i < block->columns(); ++i) {
if (i < key_size) {
columns[i] = std::move(key_columns[i]);
} else {
columns[i] = std::move(value_columns[i - key_size]);
}
}
block->set_columns(std::move(columns));
}
return Status::OK();
}
Status AggLocalState::_get_results_without_key(RuntimeState* state, vectorized::Block* block,
bool* eos) {
SCOPED_TIMER(_get_results_timer);
auto& shared_state = *_shared_state;
// 1. `child(0)->rows_returned() == 0` mean not data from child
// in level two aggregation node should return NULL result
// level one aggregation node set `eos = true` return directly
if (UNLIKELY(_shared_state->input_num_rows == 0)) {
*eos = true;
return Status::OK();
}
block->clear();
DCHECK(shared_state.agg_data->without_key != nullptr);
size_t agg_size = shared_state.aggregate_evaluators.size();
vectorized::MutableColumns value_columns(agg_size);
std::vector<vectorized::DataTypePtr> data_types(agg_size);
// will serialize data to string column
for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
data_types[i] = shared_state.aggregate_evaluators[i]->function()->get_serialized_type();
value_columns[i] =
shared_state.aggregate_evaluators[i]->function()->create_serialize_column();
}
for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
shared_state.aggregate_evaluators[i]->function()->serialize_without_key_to_column(
shared_state.agg_data->without_key + shared_state.offsets_of_aggregate_states[i],
*value_columns[i]);
}
{
vectorized::ColumnsWithTypeAndName data_with_schema;
for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
vectorized::ColumnWithTypeAndName column_with_schema = {nullptr, data_types[i], ""};
data_with_schema.push_back(std::move(column_with_schema));
}
*block = vectorized::Block(data_with_schema);
}
block->set_columns(std::move(value_columns));
*eos = true;
return Status::OK();
}
Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& shared_state = *_shared_state;
DCHECK(_shared_state->agg_data->without_key != nullptr);
block->clear();
auto& p = _parent->cast<AggSourceOperatorX>();
*block = vectorized::VectorizedUtils::create_empty_columnswithtypename(p.row_descriptor());
size_t agg_size = shared_state.aggregate_evaluators.size();
vectorized::MutableColumns columns(agg_size);
std::vector<vectorized::DataTypePtr> data_types(agg_size);
for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
data_types[i] = shared_state.aggregate_evaluators[i]->function()->get_return_type();
columns[i] = data_types[i]->create_column();
}
for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) {
auto column = columns[i].get();
shared_state.aggregate_evaluators[i]->insert_result_info(
shared_state.agg_data->without_key + shared_state.offsets_of_aggregate_states[i],
column);
}
const auto& block_schema = block->get_columns_with_type_and_name();
DCHECK_EQ(block_schema.size(), columns.size());
for (int i = 0; i < block_schema.size(); ++i) {
const auto column_type = block_schema[i].type;
if (!column_type->equals(*data_types[i])) {
if (column_type->get_primitive_type() != TYPE_ARRAY) {
if (!column_type->is_nullable() || data_types[i]->is_nullable() ||
!remove_nullable(column_type)->equals(*data_types[i])) {
return Status::InternalError(
"node id = {}, column_type not match data_types, column_type={}, "
"data_types={}",
_parent->node_id(), column_type->get_name(), data_types[i]->get_name());
}
}
// Result of operator is nullable, but aggregate function result is not nullable
// this happens when:
// 1. no group by
// 2. input of aggregate function is empty
// 3. all of input columns are not nullable
if (column_type->is_nullable() && !data_types[i]->is_nullable()) {
vectorized::ColumnPtr ptr = std::move(columns[i]);
// unless `count`, other aggregate function dispose empty set should be null
// so here check the children row return
ptr = make_nullable(ptr, shared_state.input_num_rows == 0);
columns[i] = ptr->assume_mutable();
}
}
}
block->set_columns(std::move(columns));
*eos = true;
return Status::OK();
}
AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_needs_finalize(tnode.agg_node.need_finalize),
_without_key(tnode.agg_node.grouping_exprs.empty()) {}
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns()));
local_state.do_agg_limit(block, eos);
return Status::OK();
}
void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) {
if (_shared_state->reach_limit) {
if (_shared_state->do_sort_limit && _shared_state->do_limit_filter(block, block->rows())) {
vectorized::Block::filter_block_internal(block, _shared_state->need_computes);
if (auto rows = block->rows()) {
_num_rows_returned += rows;
}
} else {
reached_limit(block, eos);
}
} else {
if (auto rows = block->rows()) {
_num_rows_returned += rows;
}
}
}
void AggLocalState::make_nullable_output_key(vectorized::Block* block) {
if (block->rows() != 0) {
for (auto cid : _shared_state->make_nullable_keys) {
block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column);
block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
}
}
}
Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block) {
SCOPED_TIMER(_merge_timer);
SCOPED_PEAK_MEM(&_estimate_memory_usage);
size_t key_size = Base::_shared_state->probe_expr_ctxs.size();
vectorized::ColumnRawPtrs key_columns(key_size);
for (size_t i = 0; i < key_size; ++i) {
key_columns[i] = block->get_by_position(i).column.get();
}
uint32_t rows = (uint32_t)block->rows();
if (_places.size() < rows) {
_places.resize(rows);
}
_emplace_into_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
auto col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
}
size_t buffer_size =
Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() * rows;
if (_deserialize_buffer.size() < buffer_size) {
_deserialize_buffer.resize(buffer_size);
}
{
SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(), _shared_state->offsets_of_aggregate_states[i],
_deserialize_buffer.data(), column.get(), _agg_arena_pool, rows);
}
}
return Status::OK();
}
Status AggSourceOperatorX::merge_with_serialized_key_helper(RuntimeState* state,
vectorized::Block* block) {
auto& local_state = get_local_state(state);
return local_state.merge_with_serialized_key_helper(block);
}
size_t AggSourceOperatorX::get_estimated_memory_size_for_merging(RuntimeState* state,
size_t rows) const {
auto& local_state = get_local_state(state);
size_t size = std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> size_t {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
return 0;
},
[&](auto& agg_method) { return agg_method.hash_table->estimate_memory(rows); }},
local_state._shared_state->agg_data->method_variant);
size += local_state._shared_state->aggregate_data_container->estimate_memory(rows);
return size;
}
void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* places,
vectorized::ColumnRawPtrs& key_columns,
uint32_t num_rows) {
std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> void {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table");
},
[&](auto& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns);
agg_method.init_serialized_keys(key_columns, num_rows);
auto creator = [this](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key_and_origin(key, origin, _agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
ctor(key, mapped);
};
auto creator_for_null_key = [&](auto& mapped) {
mapped = _agg_arena_pool.aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
if (!st) {
throw Exception(st.code(), st.to_string());
}
};
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
places[i] = *agg_method.lazy_emplace(state, i, creator,
creator_for_null_key);
}
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
COUNTER_SET(_hash_table_memory_usage,
static_cast<int64_t>(
agg_method.hash_table->get_buffer_size_in_bytes()));
COUNTER_SET(_hash_table_size_counter,
static_cast<int64_t>(agg_method.hash_table->size()));
COUNTER_SET(
_memory_usage_container,
static_cast<int64_t>(
_shared_state->aggregate_data_container->memory_usage()));
COUNTER_SET(_memory_usage_arena,
static_cast<int64_t>(_agg_arena_pool.size()));
}},
_shared_state->agg_data->method_variant);
}
Status AggLocalState::close(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}
vectorized::PODArray<vectorized::AggregateDataPtr> tmp_places;
_places.swap(tmp_places);
std::vector<char> tmp_deserialize_buffer;
_deserialize_buffer.swap(tmp_deserialize_buffer);
return Base::close(state);
}
} // namespace doris::pipeline