blob: 2594221e15305d109b25b750e56178e1b6e05d93 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "partitioned_aggregation_sink_operator.h"
#include <gen_cpp/Types_types.h>
#include <cstdint>
#include <limits>
#include <memory>
#include "aggregation_sink_operator.h"
#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
#include "runtime/fragment_mgr.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "vec/spill/spill_stream.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state) {}
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_open_timer);
_init_counters();
auto& parent = Base::_parent->template cast<Parent>();
Base::_shared_state->init_spill_params(parent._spill_partition_count);
RETURN_IF_ERROR(setup_in_memory_agg_op(state));
for (const auto& probe_expr_ctx : Base::_shared_state->in_mem_shared_state->probe_expr_ctxs) {
key_columns_.emplace_back(probe_expr_ctx->root()->data_type()->create_column());
}
for (const auto& aggregate_evaluator :
Base::_shared_state->in_mem_shared_state->aggregate_evaluators) {
value_data_types_.emplace_back(aggregate_evaluator->function()->get_serialized_type());
value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column());
}
_rows_in_partitions.assign(Base::_shared_state->partition_count, 0);
return Status::OK();
}
Status PartitionedAggSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_open_timer);
_shared_state->setup_shared_profile(custom_profile());
return Base::open(state);
}
Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_close_timer);
if (Base::_closed) {
return Status::OK();
}
return Base::close(state, exec_status);
}
void PartitionedAggSinkLocalState::_init_counters() {
_internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile");
_memory_usage_reserved =
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "MemoryUsageReserved", TUnit::BYTES, 1);
_spill_serialize_hash_table_timer =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(), "SpillSerializeHashTableTime", 1);
}
#define UPDATE_PROFILE(name) \
update_profile_from_inner_profile<spilled>(name, custom_profile(), child_profile)
template <bool spilled>
void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) {
UPDATE_PROFILE("MemoryUsageHashTable");
UPDATE_PROFILE("MemoryUsageSerializeKeyArena");
UPDATE_PROFILE("BuildTime");
UPDATE_PROFILE("MergeTime");
UPDATE_PROFILE("DeserializeAndMergeTime");
UPDATE_PROFILE("HashTableComputeTime");
UPDATE_PROFILE("HashTableEmplaceTime");
UPDATE_PROFILE("HashTableInputCount");
UPDATE_PROFILE("HashTableSize");
UPDATE_PROFILE("MemoryUsageContainer");
UPDATE_PROFILE("MemoryUsageArena");
update_max_min_rows_counter();
}
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id,
int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, tnode.node_id, dest_id) {
_agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, dest_id, tnode,
descs, require_bucket_distribution);
_spillable = true;
}
Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::init(tnode, state));
_name = "PARTITIONED_AGGREGATION_SINK_OPERATOR";
_spill_partition_count = state->spill_aggregation_partition_count();
return _agg_sink_operator->init(tnode, state);
}
Status PartitionedAggSinkOperatorX::prepare(RuntimeState* state) {
return _agg_sink_operator->prepare(state);
}
Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
local_state._eos = eos;
auto* runtime_state = local_state._runtime_state.get();
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::sink", {
return Status::Error<INTERNAL_ERROR>("fault_inject partitioned_agg_sink sink failed");
});
RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false));
size_t revocable_size = 0;
int64_t query_mem_limit = 0;
if (eos) {
revocable_size = revocable_mem_size(state);
query_mem_limit = state->get_query_ctx()->resource_ctx()->memory_context()->mem_limit();
LOG(INFO) << fmt::format(
"Query:{}, agg sink:{}, task:{}, eos, need spill:{}, query mem limit:{}, "
"revocable memory:{}",
print_id(state->query_id()), node_id(), state->task_id(),
local_state._shared_state->is_spilled, PrettyPrinter::print_bytes(query_mem_limit),
PrettyPrinter::print_bytes(revocable_size));
if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) > 0) {
RETURN_IF_ERROR(revoke_memory(state, nullptr));
} else {
for (auto& partition : local_state._shared_state->spill_partitions) {
RETURN_IF_ERROR(partition->finish_current_spilling(eos));
}
local_state._dependency->set_ready_to_read();
}
} else {
local_state._dependency->set_ready_to_read();
}
} else if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
return revoke_memory(state, nullptr);
}
}
if (!local_state._shared_state->is_spilled) {
auto* sink_local_state = local_state._runtime_state->get_sink_local_state();
local_state.update_profile<false>(sink_local_state->custom_profile());
}
return Status::OK();
}
Status PartitionedAggSinkOperatorX::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
auto& local_state = get_local_state(state);
return local_state.revoke_memory(state, spill_context);
}
size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
auto& local_state = get_local_state(state);
auto* runtime_state = local_state._runtime_state.get();
auto size = _agg_sink_operator->get_revocable_mem_size(runtime_state);
return size;
}
Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state) {
_runtime_state = RuntimeState::create_unique(
state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
_runtime_state->set_task_execution_context(state->get_task_execution_context().lock());
_runtime_state->set_be_number(state->be_number());
_runtime_state->set_desc_tbl(&state->desc_tbl());
_runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr());
_runtime_state->set_task_id(state->task_id());
auto& parent = Base::_parent->template cast<Parent>();
Base::_shared_state->in_mem_shared_state_sptr =
parent._agg_sink_operator->create_shared_state();
Base::_shared_state->in_mem_shared_state =
static_cast<AggSharedState*>(Base::_shared_state->in_mem_shared_state_sptr.get());
Base::_shared_state->in_mem_shared_state->enable_spill = true;
LocalSinkStateInfo info {.task_idx = 0,
.parent_profile = _internal_runtime_profile.get(),
.sender_id = -1,
.shared_state = Base::_shared_state->in_mem_shared_state_sptr.get(),
.shared_state_map = {},
.tsink = {}};
RETURN_IF_ERROR(parent._agg_sink_operator->setup_local_state(_runtime_state.get(), info));
auto* sink_local_state = _runtime_state->get_sink_local_state();
DCHECK(sink_local_state != nullptr);
return sink_local_state->open(state);
}
size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) {
auto& local_state = get_local_state(state);
auto* runtime_state = local_state._runtime_state.get();
auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state, eos);
COUNTER_SET(local_state._memory_usage_reserved, int64_t(size));
return size;
}
template <typename HashTableCtxType, typename KeyType>
Status PartitionedAggSinkLocalState::to_block(HashTableCtxType& context, std::vector<KeyType>& keys,
std::vector<vectorized::AggregateDataPtr>& values,
const vectorized::AggregateDataPtr null_key_data) {
SCOPED_TIMER(_spill_serialize_hash_table_timer);
context.insert_keys_into_columns(keys, key_columns_, (uint32_t)keys.size());
if (null_key_data) {
// only one key of group by support wrap null key
// here need additional processing logic on the null key / value
CHECK(key_columns_.size() == 1);
CHECK(key_columns_[0]->is_nullable());
key_columns_[0]->insert_data(nullptr, 0);
values.emplace_back(null_key_data);
}
for (size_t i = 0; i < Base::_shared_state->in_mem_shared_state->aggregate_evaluators.size();
++i) {
Base::_shared_state->in_mem_shared_state->aggregate_evaluators[i]
->function()
->serialize_to_column(
values,
Base::_shared_state->in_mem_shared_state->offsets_of_aggregate_states[i],
value_columns_[i], values.size());
}
vectorized::ColumnsWithTypeAndName key_columns_with_schema;
for (int i = 0; i < key_columns_.size(); ++i) {
key_columns_with_schema.emplace_back(
std::move(key_columns_[i]),
Base::_shared_state->in_mem_shared_state->probe_expr_ctxs[i]->root()->data_type(),
Base::_shared_state->in_mem_shared_state->probe_expr_ctxs[i]->root()->expr_name());
}
key_block_ = key_columns_with_schema;
vectorized::ColumnsWithTypeAndName value_columns_with_schema;
for (int i = 0; i < value_columns_.size(); ++i) {
value_columns_with_schema.emplace_back(
std::move(value_columns_[i]), value_data_types_[i],
Base::_shared_state->in_mem_shared_state->aggregate_evaluators[i]
->function()
->get_name());
}
value_block_ = value_columns_with_schema;
for (const auto& column : key_block_.get_columns_with_type_and_name()) {
block_.insert(column);
}
for (const auto& column : value_block_.get_columns_with_type_and_name()) {
block_.insert(column);
}
return Status::OK();
}
template <typename HashTableCtxType, typename KeyType>
Status PartitionedAggSinkLocalState::_spill_partition(
RuntimeState* state, HashTableCtxType& context, AggSpillPartitionSPtr& spill_partition,
std::vector<KeyType>& keys, std::vector<vectorized::AggregateDataPtr>& values,
const vectorized::AggregateDataPtr null_key_data, bool is_last) {
vectorized::SpillStreamSPtr spill_stream;
auto status = spill_partition->get_spill_stream(state, Base::_parent->node_id(),
Base::operator_profile(), spill_stream);
RETURN_IF_ERROR(status);
status = to_block(context, keys, values, null_key_data);
RETURN_IF_ERROR(status);
if (is_last) {
std::vector<KeyType> tmp_keys;
std::vector<vectorized::AggregateDataPtr> tmp_values;
keys.swap(tmp_keys);
values.swap(tmp_values);
} else {
keys.clear();
values.clear();
}
status = spill_stream->spill_block(state, block_, false);
RETURN_IF_ERROR(status);
status = spill_partition->flush_if_full();
_reset_tmp_data();
return status;
}
template <typename HashTableCtxType, typename HashTableType>
Status PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
HashTableCtxType& context,
HashTableType& hash_table,
const size_t size_to_revoke, bool eos) {
Status status;
Defer defer {[&]() {
if (!status.ok()) {
Base::_shared_state->close();
}
}};
context.init_iterator();
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
const auto total_rows =
Base::_shared_state->in_mem_shared_state->aggregate_data_container->total_count();
const size_t size_to_revoke_ = std::max<size_t>(size_to_revoke, 1);
// `spill_batch_rows` will be between 4k and 1M
// and each block to spill will not be larger than 32MB(`MAX_SPILL_WRITE_BATCH_MEM`)
const auto spill_batch_rows = std::min<size_t>(
1024 * 1024, std::max<size_t>(4096, vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM *
total_rows / size_to_revoke_));
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", spill_batch_rows: " << spill_batch_rows << ", total rows: " << total_rows;
size_t row_count = 0;
std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos(
Base::_shared_state->partition_count);
auto& iter = Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator;
while (iter != Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() &&
!state->is_cancelled()) {
const auto& key = iter.template get_key<typename HashTableType::key_type>();
auto partition_index = Base::_shared_state->get_partition_index(hash_table.hash(key));
spill_infos[partition_index].keys_.emplace_back(key);
spill_infos[partition_index].values_.emplace_back(iter.get_aggregate_data());
if (++row_count == spill_batch_rows) {
row_count = 0;
for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled();
++i) {
if (spill_infos[i].keys_.size() >= spill_batch_rows) {
_rows_in_partitions[i] += spill_infos[i].keys_.size();
status = _spill_partition(
state, context, Base::_shared_state->spill_partitions[i],
spill_infos[i].keys_, spill_infos[i].values_, nullptr, false);
RETURN_IF_ERROR(status);
}
}
}
++iter;
}
auto hash_null_key_data = hash_table.has_null_key_data();
for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled(); ++i) {
auto spill_null_key_data =
(hash_null_key_data && i == Base::_shared_state->partition_count - 1);
if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) {
_rows_in_partitions[i] += spill_infos[i].keys_.size();
status = _spill_partition(
state, context, Base::_shared_state->spill_partitions[i], spill_infos[i].keys_,
spill_infos[i].values_,
spill_null_key_data
? hash_table.template get_null_key_data<vectorized::AggregateDataPtr>()
: nullptr,
true);
RETURN_IF_ERROR(status);
}
}
for (auto& partition : Base::_shared_state->spill_partitions) {
status = partition->finish_current_spilling(eos);
RETURN_IF_ERROR(status);
}
if (eos) {
_clear_tmp_data();
}
return Status::OK();
}
Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
LOG(INFO) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory, eos:{}, need spill:{}, revocable "
"memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos,
_shared_state->is_spilled,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
auto* sink_local_state = _runtime_state->get_sink_local_state();
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
custom_profile()->add_info_string("Spilled", "true");
update_profile<false>(sink_local_state->custom_profile());
} else {
update_profile<true>(sink_local_state->custom_profile());
}
auto& parent = Base::_parent->template cast<Parent>();
auto query_id = state->query_id();
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func", {
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_agg_sink revoke_memory submit_func failed");
});
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
SpillSinkRunnable spill_runnable(
state, spill_context, operator_profile(),
[this, &parent, state, query_id, size_to_revoke] {
Status status;
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
status = Status::InternalError(
"fault_inject partitioned_agg_sink "
"revoke_memory canceled");
state->get_query_ctx()->cancel(status);
return status;
});
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory error:{}",
print_id(query_id), Base::_parent->node_id(), state->task_id(),
status);
}
_shared_state->close();
} else {
LOG(INFO) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory finish, eos:{}, "
"revocable memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
_eos,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
}
if (_eos) {
Base::_dependency->set_ready_to_read();
}
state->get_query_ctx()
->resource_ctx()
->task_controller()
->decrease_revoking_tasks_count();
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
status = std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
state, agg_method, hash_table, size_to_revoke, _eos));
}},
agg_data->method_variant);
RETURN_IF_ERROR(status);
status = parent._agg_sink_operator->reset_hash_table(runtime_state);
return status;
});
return spill_runnable.run();
}
void PartitionedAggSinkLocalState::_reset_tmp_data() {
block_.clear();
key_columns_.clear();
value_columns_.clear();
key_block_.clear_column_data();
value_block_.clear_column_data();
key_columns_ = key_block_.mutate_columns();
value_columns_ = value_block_.mutate_columns();
}
void PartitionedAggSinkLocalState::_clear_tmp_data() {
{
vectorized::Block empty_block;
block_.swap(empty_block);
}
{
vectorized::Block empty_block;
key_block_.swap(empty_block);
}
{
vectorized::Block empty_block;
value_block_.swap(empty_block);
}
{
vectorized::MutableColumns cols;
key_columns_.swap(cols);
}
{
vectorized::MutableColumns cols;
value_columns_.swap(cols);
}
vectorized::DataTypes tmp_value_data_types;
value_data_types_.swap(tmp_value_data_types);
}
bool PartitionedAggSinkLocalState::is_blockable() const {
return _shared_state->is_spilled;
}
#include "common/compile_check_end.h"
} // namespace doris::pipeline