blob: f4621e871b695014b3dc7db6fd00c9706bb70f57 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/operator/exchange_source_operator.h"
#include <fmt/core.h>
#include <cstdint>
#include <memory>
#include "exec/exchange/vdata_stream_mgr.h"
#include "exec/exchange/vdata_stream_recvr.h"
#include "exec/operator/operator.h"
#include "exec/sort/vsort_exec_exprs.h"
#include "exprs/vexpr_context.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/defer_op.h"
namespace doris {
#include "common/compile_check_begin.h"
ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent), num_rows_skipped(0), is_ready(false) {}
ExchangeLocalState::~ExchangeLocalState() {
// It is necessary to call stream_recvr->close() in ~ExchangeLocalState.
// This is because VDataStreamRecvr is initialized during init and then added to VDataStreamMgr.
// When closing, VDataStreamRecvr is removed from VDataStreamMgr.
// However, in some error situations, the pipeline may not be opened and therefore may not be closed either.
// The close method of VDataStreamRecvr contains checks and will not be closed multiple times.
if (stream_recvr) {
stream_recvr->close();
}
}
std::string ExchangeLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, recvr: ({})", Base::debug_string(indentation_level),
stream_recvr->debug_string());
return fmt::to_string(debug_string_buffer);
}
std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})",
_num_senders, _is_merging);
return fmt::to_string(debug_string_buffer);
}
void ExchangeLocalState::create_stream_recvr(RuntimeState* state) {
auto& p = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, _memory_used_counter, state->fragment_instance_id(), p.node_id(),
p.num_senders(), custom_profile(), p.is_merging(),
std::max(20480, config::exchg_node_buffer_size_bytes /
(p.is_merging() ? p.num_senders() : 1)));
}
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
create_stream_recvr(state);
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
fmt::format("SHUFFLE_DATA_DEPENDENCY_{}", i));
queues[i]->set_dependency(deps[i]);
metrics[i] = custom_profile()->add_nonzero_counter(fmt::format("WaitForData{}", i),
TUnit ::TIME_NS, timer_name, 1);
}
get_data_from_recvr_timer = ADD_TIMER(custom_profile(), "GetDataFromRecvrTime");
filter_timer = ADD_TIMER(custom_profile(), "FilterTime");
create_merger_timer = ADD_TIMER(custom_profile(), "CreateMergerTime");
custom_profile()->add_info_string("InstanceID", print_id(state->fragment_instance_id()));
return Status::OK();
}
Status ExchangeLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ExchangeSourceOperatorX>();
if (p.is_merging()) {
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, vsort_exec_exprs));
}
return Status::OK();
}
ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
int operator_id, const DescriptorTbl& descs,
int num_senders)
: OperatorX<ExchangeLocalState>(pool, tnode, operator_id, descs),
_num_senders(num_senders),
_is_merging(tnode.exchange_node.__isset.sort_info),
_partition_type(tnode.exchange_node.__isset.partition_type
? tnode.exchange_node.partition_type
: TPartitionType::UNPARTITIONED),
_offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0) {}
Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::init(tnode, state));
if (!_is_merging) {
return Status::OK();
}
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.exchange_node.sort_info, _pool));
_is_asc_order = tnode.exchange_node.sort_info.is_asc_order;
_nulls_first = tnode.exchange_node.sort_info.nulls_first;
return Status::OK();
}
Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state));
DCHECK_GT(_num_senders, 0);
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
}
return Status::OK();
}
Status ExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, bool* eos) {
auto& local_state = get_local_state(state);
Defer is_eos([&]() {
if (*eos) {
local_state.stream_recvr->set_sink_dep_always_ready();
}
});
SCOPED_TIMER(local_state.exec_time_counter());
if (_is_merging && !local_state.is_ready) {
SCOPED_TIMER(local_state.create_merger_timer);
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
local_state.vsort_exec_exprs.ordering_expr_ctxs(), _is_asc_order, _nulls_first,
state->batch_size(), _limit, _offset));
local_state.is_ready = true;
return Status::OK();
}
{
SCOPED_TIMER(local_state.get_data_from_recvr_timer);
RETURN_IF_ERROR(local_state.stream_recvr->get_next(block, eos));
}
{
SCOPED_TIMER(local_state.filter_timer);
RETURN_IF_ERROR(doris::VExprContext::filter_block(local_state.conjuncts(), block,
block->columns()));
}
// In non-merge cases, if eos = true, the block must be empty
// In merge cases, this cannot be guaranteed
if (!*eos || block->rows() > 0) {
if (!_is_merging) {
// If it is merging, we will handle the offset inside the merger, and the exchange source does not need to handle it
if (local_state.num_rows_skipped + block->rows() < _offset) {
local_state.num_rows_skipped += block->rows();
block->set_num_rows(0);
} else if (local_state.num_rows_skipped < _offset) {
int64_t offset = _offset - local_state.num_rows_skipped;
local_state.num_rows_skipped = _offset;
// should skip some rows
block->skip_num_rows(offset);
}
}
// Merge actually also handles the limit, but handling the limit one more time will not cause correctness issues
if (_limit == -1 || local_state.num_rows_returned() + block->rows() < _limit) {
local_state.add_num_rows_returned(block->rows());
} else {
*eos = true;
auto limit = _limit - local_state.num_rows_returned();
block->set_num_rows(limit);
local_state.set_num_rows_returned(_limit);
}
}
return Status::OK();
}
Status ExchangeLocalState::close(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}
const auto& queues = stream_recvr->sender_queues();
for (size_t i = 0; i < deps.size(); i++) {
COUNTER_SET(metrics[i], deps[i]->watcher_elapse_time());
}
if (stream_recvr != nullptr) {
stream_recvr->close();
}
if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
vsort_exec_exprs.close(state);
}
return Base::close(state);
}
Status ExchangeSourceOperatorX::close(RuntimeState* state) {
if (_is_merging && !is_closed()) {
_vsort_exec_exprs.close(state);
}
_is_closed = true;
return OperatorX<ExchangeLocalState>::close(state);
}
Status ExchangeSourceOperatorX::reset(RuntimeState* state) {
auto& local_state = get_local_state(state);
return local_state.reset(state);
}
} // namespace doris