blob: 9adff22d52fa85dd5413462193758317d3da0f07 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "partitioned_aggregation_source_operator.h"
#include <glog/logging.h>
#include <string>
#include "aggregation_source_operator.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
#include "runtime/fragment_mgr.h"
#include "util/runtime_profile.h"
#include "vec/spill/spill_stream.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
PartitionedAggLocalState::PartitionedAggLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent) {}
Status PartitionedAggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile");
return Status::OK();
}
Status PartitionedAggLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
SCOPED_TIMER(_open_timer);
if (_opened) {
return Status::OK();
}
_opened = true;
RETURN_IF_ERROR(setup_in_memory_agg_op(state));
return Status::OK();
}
#define UPDATE_COUNTER_FROM_INNER(name) \
update_profile_from_inner_profile<spilled>(name, custom_profile(), child_profile)
template <bool spilled>
void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) {
UPDATE_COUNTER_FROM_INNER("GetResultsTime");
UPDATE_COUNTER_FROM_INNER("HashTableIterateTime");
UPDATE_COUNTER_FROM_INNER("InsertKeysToColumnTime");
UPDATE_COUNTER_FROM_INNER("InsertValuesToColumnTime");
UPDATE_COUNTER_FROM_INNER("MergeTime");
UPDATE_COUNTER_FROM_INNER("DeserializeAndMergeTime");
UPDATE_COUNTER_FROM_INNER("HashTableComputeTime");
UPDATE_COUNTER_FROM_INNER("HashTableEmplaceTime");
UPDATE_COUNTER_FROM_INNER("HashTableInputCount");
UPDATE_COUNTER_FROM_INNER("MemoryUsageHashTable");
UPDATE_COUNTER_FROM_INNER("HashTableSize");
UPDATE_COUNTER_FROM_INNER("MemoryUsageContainer");
UPDATE_COUNTER_FROM_INNER("MemoryUsageArena");
}
#undef UPDATE_COUNTER_FROM_INNER
Status PartitionedAggLocalState::close(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}
return Base::close(state);
}
PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
int operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs) {
_agg_source_operator = std::make_unique<AggSourceOperatorX>(pool, tnode, operator_id, descs);
}
Status PartitionedAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
_op_name = "PARTITIONED_AGGREGATION_OPERATOR";
return _agg_source_operator->init(tnode, state);
}
Status PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::prepare(state));
return _agg_source_operator->prepare(state);
}
Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
RETURN_IF_ERROR(OperatorXBase::close(state));
return _agg_source_operator->close(state);
}
bool PartitionedAggSourceOperatorX::is_serial_operator() const {
return _agg_source_operator->is_serial_operator();
}
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
local_state.copy_shared_spill_profile();
Status status;
Defer defer {[&]() {
if (!status.ok() || *eos) {
local_state._shared_state->close();
}
}};
SCOPED_TIMER(local_state.exec_time_counter());
if (local_state._shared_state->is_spilled &&
local_state._need_to_merge_data_for_current_partition) {
if (local_state._blocks.empty() && !local_state._current_partition_eos) {
bool has_recovering_data = false;
status = local_state.recover_blocks_from_disk(state, has_recovering_data);
RETURN_IF_ERROR(status);
*eos = !has_recovering_data;
return Status::OK();
} else if (!local_state._blocks.empty()) {
size_t merged_rows = 0;
while (!local_state._blocks.empty()) {
auto block_ = std::move(local_state._blocks.front());
merged_rows += block_.rows();
local_state._blocks.erase(local_state._blocks.begin());
status = _agg_source_operator->merge_with_serialized_key_helper(
local_state._runtime_state.get(), &block_);
RETURN_IF_ERROR(status);
}
local_state._estimate_memory_usage +=
_agg_source_operator->get_estimated_memory_size_for_merging(
local_state._runtime_state.get(), merged_rows);
if (!local_state._current_partition_eos) {
return Status::OK();
}
}
local_state._need_to_merge_data_for_current_partition = false;
}
// not spilled in sink or current partition still has data
auto* runtime_state = local_state._runtime_state.get();
local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once();
status = _agg_source_operator->get_block(runtime_state, block, eos);
if (!local_state._shared_state->is_spilled) {
auto* source_local_state =
local_state._runtime_state->get_local_state(_agg_source_operator->operator_id());
local_state.update_profile<false>(source_local_state->custom_profile());
}
RETURN_IF_ERROR(status);
if (*eos) {
if (local_state._shared_state->is_spilled) {
auto* source_local_state = local_state._runtime_state->get_local_state(
_agg_source_operator->operator_id());
local_state.update_profile<true>(source_local_state->custom_profile());
if (!local_state._shared_state->spill_partitions.empty()) {
local_state._current_partition_eos = false;
local_state._need_to_merge_data_for_current_partition = true;
status = local_state._shared_state->in_mem_shared_state->reset_hash_table();
RETURN_IF_ERROR(status);
*eos = false;
}
}
}
local_state.reached_limit(block, eos);
return Status::OK();
}
Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
_runtime_state = RuntimeState::create_unique(
state->fragment_instance_id(), state->query_id(), state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx());
_runtime_state->set_task_execution_context(state->get_task_execution_context().lock());
_runtime_state->set_be_number(state->be_number());
_runtime_state->set_desc_tbl(&state->desc_tbl());
_runtime_state->resize_op_id_to_local_state(state->max_operator_id());
_runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr());
auto& parent = Base::_parent->template cast<Parent>();
DCHECK(Base::_shared_state->in_mem_shared_state);
LocalStateInfo state_info {.parent_profile = _internal_runtime_profile.get(),
.scan_ranges = {},
.shared_state = Base::_shared_state->in_mem_shared_state,
.shared_state_map = {},
.task_idx = 0};
RETURN_IF_ERROR(
parent._agg_source_operator->setup_local_state(_runtime_state.get(), state_info));
auto* source_local_state =
_runtime_state->get_local_state(parent._agg_source_operator->operator_id());
DCHECK(source_local_state != nullptr);
return source_local_state->open(state);
}
Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, bool& has_data) {
const auto query_id = state->query_id();
if (_shared_state->spill_partitions.empty()) {
_shared_state->close();
has_data = false;
return Status::OK();
}
has_data = true;
auto spill_func = [this, state, query_id] {
Status status;
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover agg data error:{}",
print_id(query_id), _parent->node_id(), state->task_id(), status);
}
_shared_state->close();
}
}};
bool has_agg_data = false;
size_t accumulated_blocks_size = 0;
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
while (!_shared_state->spill_partitions[0]->spill_streams_.empty() &&
!state->is_cancelled() && !has_agg_data) {
auto& stream = _shared_state->spill_partitions[0]->spill_streams_[0];
stream->set_read_counters(operator_profile());
vectorized::Block block;
bool eos = false;
while (!eos && !state->is_cancelled()) {
{
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
{
status = Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_agg_source "
"recover_spill_data failed");
});
if (status.ok()) {
status = stream->read_next_block_sync(&block, &eos);
}
}
RETURN_IF_ERROR(status);
if (!block.empty()) {
has_agg_data = true;
accumulated_blocks_size += block.allocated_bytes();
_blocks.emplace_back(std::move(block));
if (accumulated_blocks_size >=
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
break;
}
}
}
_current_partition_eos = eos;
if (_current_partition_eos) {
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
_shared_state->spill_partitions[0]->spill_streams_.pop_front();
}
}
if (_shared_state->spill_partitions[0]->spill_streams_.empty()) {
_shared_state->spill_partitions.pop_front();
}
}
VLOG_DEBUG << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover partitioned finished, partitions "
"left:{}, bytes read:{}",
print_id(query_id), _parent->node_id(), state->task_id(),
_shared_state->spill_partitions.size(), accumulated_blocks_size);
return status;
};
auto exception_catch_func = [this, state, spill_func, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
"merge spill data canceled");
state->get_query_ctx()->cancel(st);
return st;
});
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }();
LOG_IF(INFO, !status.ok()) << fmt::format(
"Query:{}, agg probe:{}, task:{}, recover exception:{}", print_id(query_id),
_parent->node_id(), state->task_id(), status.to_string());
return status;
};
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_agg_source submit_func failed");
});
VLOG_DEBUG << fmt::format(
"Query:{}, agg probe:{}, task:{}, begin to recover, partitions left:{}, ",
print_id(query_id), _parent->node_id(), state->task_id(),
_shared_state->spill_partitions.size());
return SpillRecoverRunnable(state, operator_profile(), exception_catch_func).run();
}
bool PartitionedAggLocalState::is_blockable() const {
return _shared_state->is_spilled;
}
#include "common/compile_check_end.h"
} // namespace doris::pipeline